package com.qianwen.smartman.modules.cps.message.consumer; import cn.hutool.json.JSONUtil; import java.util.HashMap; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.qianwen.smartman.common.cache.cps.WorkstationCache; import com.qianwen.smartman.common.cache.cps.WorkstationOfMachineCache; import com.qianwen.smartman.common.utils.CommonUtil; import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage; import com.qianwen.core.tool.utils.Func; import com.qianwen.core.tool.utils.SpringUtil; import com.qianwen.core.websocket.distribute.MessageDO; import com.qianwen.core.websocket.distribute.RedisMessageDistributor; import com.qianwen.smartman.modules.cps.dto.WorkstationParamTypeDTO; import com.qianwen.smartman.modules.cps.entity.WorkstationOfMachine; import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO; import com.qianwen.smartman.modules.cps.message.event.TelemetryDataMessage; import com.qianwen.smartman.modules.cps.utils.TelemetryDataUtils; import org.springframework.stereotype.Component; /** * rockmq消息消费者 telemetry(遥测) */ @RocketMQMessageListener(topic = "telemetry-data", consumerGroup = "telemetry-data-real-time-consumer", consumeMode = ConsumeMode.ORDERLY) @Component public class TelemetryDataRealTimeConsumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener { private static final Logger log = LoggerFactory.getLogger(TelemetryDataRealTimeConsumer.class); public void onMessage(TelemetryDataMessage telemetryDataMessage) { List workStationOfMachineList = WorkstationOfMachineCache.getList(telemetryDataMessage.getMachineId()); if (Func.isEmpty(workStationOfMachineList)) { log.info("[采集数据] 对应得采集设备{}没有绑定任何工位,直接丢弃", telemetryDataMessage); return; } List workstationParamTypeDTOList = WorkstationCache.getWorkstationParamTypeCache(telemetryDataMessage.getMachineId(), telemetryDataMessage.getName()); if (Func.isNotEmpty(workstationParamTypeDTOList)) { workstationParamTypeDTOList.forEach(workStationItem -> { final TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem); if (Func.isNotEmpty(result)) { final TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(result.getValue(), result.getTimestamp()); WorkstationCache.setWorkstationRealTime(workStationItem.getWorkstationId(), result.getName(), telemetryDataResponseDTO); RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage(); jsonWebSocketMessage.setId(workStationItem.getWorkstationId().toString()); jsonWebSocketMessage.setData(new HashMap() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1 { put(result.getName(), telemetryDataResponseDTO); } }); RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class); MessageDO messageDO = new MessageDO(); messageDO.setNeedBroadcast(Boolean.FALSE); messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage)); messageDistributor.distribute(messageDO); } }); } } @Override public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setMaxReconsumeTimes(1); consumer.setInstanceName(CommonUtil.changeInstanceNameToPID(consumer)); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); } }