package com.qianwen.smartman.modules.dmpLog.message.consumer; import cn.hutool.log.StaticLog; import com.alibaba.fastjson.JSONObject; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Objects; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import com.qianwen.core.tool.utils.Func; import com.qianwen.smartman.modules.dmpLog.constant.DmpLogSyncConstant; import com.qianwen.smartman.modules.dmpLog.dto.DmpLogMessage; import com.qianwen.smartman.modules.dmpLog.entity.SignalFlowLog; import com.qianwen.smartman.modules.dmpLog.entity.SignalLog; import com.qianwen.smartman.modules.dmpLog.entity.SignalMethodTimeLog; import com.qianwen.smartman.modules.dmpLog.enums.LogTypeEnum; import com.qianwen.smartman.modules.dmpLog.mapper.DmpLogSignalFlowMapper; import com.qianwen.smartman.modules.dmpLog.mapper.DmpLogSignalMapper; import com.qianwen.smartman.modules.dmpLog.mapper.DmpLogSignalMethodTimeMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 利用rockmq,从日志中心接受日志信息存入本地库tdengine中 */ @RocketMQMessageListener(consumerGroup = DmpLogSyncConstant.CONSUMER_GROUP, topic = DmpLogSyncConstant.DMP_LOG_TOPIC) @Component public class DmpLogSyncConsumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener { private DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyyMM"); @Autowired private DmpLogSignalMapper dmpLogSignalMapper; @Autowired private DmpLogSignalFlowMapper dmpLogSignalFlowMapper; @Autowired private DmpLogSignalMethodTimeMapper dmpLogSignalMethodTimeMapper; public void onMessage(DmpLogMessage dmpLogMessage) { JSONObject jsonObject = dmpLogMessage.getMessage(); if (Func.isNotEmpty(jsonObject)) { Integer msgType = jsonObject.getInteger("Type"); try { switch ((LogTypeEnum) Objects.requireNonNull(LogTypeEnum.getValue(msgType))) { case Signal: SignalLog signalLog = (SignalLog) jsonObject.toJavaObject(SignalLog.class); signalLog.setCollectMonth(Integer.valueOf(getYearMonth(signalLog.getTimeStamp()))); this.dmpLogSignalMapper.save(signalLog); break; case Signal_Flow: SignalFlowLog signalFlowLog = (SignalFlowLog) jsonObject.toJavaObject(SignalFlowLog.class); signalFlowLog.setCollectMonth(Integer.valueOf(getYearMonth(signalFlowLog.getTimeStamp()))); this.dmpLogSignalFlowMapper.save(signalFlowLog); break; case HanlderInnerMethod: SignalMethodTimeLog signalMethodTimeLog = (SignalMethodTimeLog) jsonObject.toJavaObject(SignalMethodTimeLog.class); signalMethodTimeLog.setCollectMonth(Integer.valueOf(getYearMonth(signalMethodTimeLog.getTimeStamp()))); this.dmpLogSignalMethodTimeMapper.save(signalMethodTimeLog); break; } } catch (Exception e) { StaticLog.error("存储dmplog日志异常,信息为:{}", new Object[]{e}); } } } private int getYearMonth(Long timestamp) { ZoneId zone = ZoneId.systemDefault(); LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp.longValue()), zone); Integer rangeParam = Integer.valueOf(localDateTime.format(this.fmt)); return rangeParam.intValue(); } public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setMaxReconsumeTimes(3); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); } }