yangys
2024-10-30 c27b939fa5fa6ce4d712f7e9ced2ad811d69d5ec
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -1,6 +1,8 @@
package com.qianwen.smartman.common.config;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -22,10 +24,20 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.core.websocket.distribute.MessageDO;
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
@Configuration
public class MdcMqttConfig {
@@ -38,7 +50,16 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.isRecevieMsg:false}")
    private boolean isRecevieMsg;
    @Autowired
    private RedisMessageDistributor messageDistributor;
    //@Autowired
    //private SuperProcessParameterMapper processParamMapper;
    @Autowired
    private RealTimeDataService realTimeDataService;
    public static final String DEFAULT_TOPIC = "mdc";
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
@@ -47,6 +68,7 @@
   
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
   public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
   @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -70,19 +92,29 @@
     * 
     * @return
     */
    /*
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数
       java.util.Random r = new java.util.Random();
       String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
        MqttPahoMessageDrivenChannelAdapter adapter;
        if(!this.isRecevieMsg) {
           adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                    mqttClientFactory(),DEFAULT_TOPIC);
       }else {
          adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
       }
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    */
    
    /**
     * 入站消息处理
@@ -91,10 +123,42 @@
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
           String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
           if(FEEDBACK_TOPIC.equals(topic) ) {
           if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
              //实时数据来了,数据从collect发送来
              String payload = (String)message.getPayload();
              logger.info("接收到mqtt消息readtime,data={}",payload);//消息体如何定义,数据点名称,值,时间,用json对象传过来
              //TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
              JSONObject payLoadJson = JSONObject.parseObject(payload);
              long workstationId = payLoadJson.getLong("workstationId");
              String name = payLoadJson.getString("name");//key?
              long time = payLoadJson.getLong("time");
              String v = payLoadJson.getString("value");
              //将数据加入缓存
              TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
                WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO);
                //发送websocket消息
                RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
                jsonWebSocketMessage.setId(""+workstationId);
                //从缓存(或直接查询)加载最新的数据,发送websocket
                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
                jsonWebSocketMessage.setData(map);
                MessageDO messageDO = new MessageDO();
                messageDO.setNeedBroadcast(false);
                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
                logger.info("websockettxt={}",msgtxt);
                messageDO.setMessageText(msgtxt);
                messageDistributor.distribute(messageDO);
           }
        };
    }