package com.qianwen.smartman.modules.cps.message.consumer;
|
|
import cn.hutool.json.JSONUtil;
|
import java.util.HashMap;
|
import java.util.List;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
|
import com.qianwen.smartman.common.cache.cps.WorkstationOfMachineCache;
|
import com.qianwen.smartman.common.utils.CommonUtil;
|
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
|
import com.qianwen.core.tool.utils.Func;
|
import com.qianwen.core.tool.utils.SpringUtil;
|
import com.qianwen.core.websocket.distribute.MessageDO;
|
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
|
import com.qianwen.smartman.modules.cps.dto.WorkstationParamTypeDTO;
|
import com.qianwen.smartman.modules.cps.entity.WorkstationOfMachine;
|
import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
|
import com.qianwen.smartman.modules.cps.message.event.TelemetryDataMessage;
|
import com.qianwen.smartman.modules.cps.utils.TelemetryDataUtils;
|
import org.springframework.stereotype.Component;
|
|
/**
|
* rockmq消息消费者 telemetry(遥测)
|
*/
|
@RocketMQMessageListener(topic = "telemetry-data", consumerGroup = "telemetry-data-real-time-consumer", consumeMode = ConsumeMode.ORDERLY)
|
@Component
|
public class TelemetryDataRealTimeConsumer implements RocketMQListener<TelemetryDataMessage>, RocketMQPushConsumerLifecycleListener {
|
private static final Logger log = LoggerFactory.getLogger(TelemetryDataRealTimeConsumer.class);
|
|
public void onMessage(TelemetryDataMessage telemetryDataMessage) {
|
List<WorkstationOfMachine> workStationOfMachineList = WorkstationOfMachineCache.getList(telemetryDataMessage.getMachineId());
|
if (Func.isEmpty(workStationOfMachineList)) {
|
log.info("[采集数据] 对应得采集设备{}没有绑定任何工位,直接丢弃", telemetryDataMessage);
|
return;
|
}
|
List<WorkstationParamTypeDTO> workstationParamTypeDTOList = WorkstationCache.getWorkstationParamTypeCache(telemetryDataMessage.getMachineId(), telemetryDataMessage.getName());
|
if (Func.isNotEmpty(workstationParamTypeDTOList)) {
|
workstationParamTypeDTOList.forEach(workStationItem -> {
|
final TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
|
if (Func.isNotEmpty(result)) {
|
final TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(result.getValue(), result.getTimestamp());
|
WorkstationCache.setWorkstationRealTime(workStationItem.getWorkstationId(), result.getName(), telemetryDataResponseDTO);
|
RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
|
jsonWebSocketMessage.setId(workStationItem.getWorkstationId().toString());
|
jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
|
{
|
put(result.getName(), telemetryDataResponseDTO);
|
}
|
});
|
RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
|
MessageDO messageDO = new MessageDO();
|
messageDO.setNeedBroadcast(Boolean.FALSE);
|
messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage));
|
messageDistributor.distribute(messageDO);
|
}
|
});
|
}
|
}
|
@Override
|
public void prepareStart(DefaultMQPushConsumer consumer) {
|
consumer.setMaxReconsumeTimes(1);
|
consumer.setInstanceName(CommonUtil.changeInstanceNameToPID(consumer));
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
}
|
|
|
}
|