yangys
2024-05-18 cc0bdfb33ef638dfafe3185c92c7076d815e1c9b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.qianwen.smartman.modules.cps.message.consumer;
 
import cn.hutool.json.JSONUtil;
import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.cache.cps.WorkstationOfMachineCache;
import com.qianwen.smartman.common.utils.CommonUtil;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
import com.qianwen.core.tool.utils.Func;
import com.qianwen.core.tool.utils.SpringUtil;
import com.qianwen.core.websocket.distribute.MessageDO;
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.modules.cps.dto.WorkstationParamTypeDTO;
import com.qianwen.smartman.modules.cps.entity.WorkstationOfMachine;
import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.cps.message.event.TelemetryDataMessage;
import com.qianwen.smartman.modules.cps.utils.TelemetryDataUtils;
import org.springframework.stereotype.Component;
 
/**
 * rockmq消息消费者 telemetry(遥测)
 */
@RocketMQMessageListener(topic = "telemetry-data", consumerGroup = "telemetry-data-real-time-consumer", consumeMode = ConsumeMode.ORDERLY)
@Component
public class TelemetryDataRealTimeConsumer implements RocketMQListener<TelemetryDataMessage>, RocketMQPushConsumerLifecycleListener {
    private static final Logger log = LoggerFactory.getLogger(TelemetryDataRealTimeConsumer.class);
 
    public void onMessage(TelemetryDataMessage telemetryDataMessage) {
        List<WorkstationOfMachine> workStationOfMachineList = WorkstationOfMachineCache.getList(telemetryDataMessage.getMachineId());
        if (Func.isEmpty(workStationOfMachineList)) {
            log.info("[采集数据] 对应得采集设备{}没有绑定任何工位,直接丢弃", telemetryDataMessage);
            return;
        }
        List<WorkstationParamTypeDTO> workstationParamTypeDTOList = WorkstationCache.getWorkstationParamTypeCache(telemetryDataMessage.getMachineId(), telemetryDataMessage.getName());
        if (Func.isNotEmpty(workstationParamTypeDTOList)) {
            workstationParamTypeDTOList.forEach(workStationItem -> {
                final TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
                if (Func.isNotEmpty(result)) {
                    final TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(result.getValue(), result.getTimestamp());
                    WorkstationCache.setWorkstationRealTime(workStationItem.getWorkstationId(), result.getName(), telemetryDataResponseDTO);
                    RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
                    jsonWebSocketMessage.setId(workStationItem.getWorkstationId().toString());
                    jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
                        {
                            put(result.getName(), telemetryDataResponseDTO);
                        }
                    });
                    RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
                    MessageDO messageDO = new MessageDO();
                    messageDO.setNeedBroadcast(Boolean.FALSE);
                    messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage));
                    messageDistributor.distribute(messageDO);
                }
            });
        }
    }
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setMaxReconsumeTimes(1);
        consumer.setInstanceName(CommonUtil.changeInstanceNameToPID(consumer));
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    }
 
    
}