yangys
2025-11-05 10c5247722995e571b3fd4dbffb178964a9bd6ee
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -2,6 +2,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -28,7 +29,11 @@
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
@@ -45,15 +50,21 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.isRecevieMsg:false}")
    private boolean isRecevieMsg;
    @Autowired
    private RedisMessageDistributor messageDistributor;
    
    //@Autowired
    //private SuperProcessParameterMapper processParamMapper;
    @Autowired
    private RealTimeDataService realTimeDataService;
    public static final String DEFAULT_TOPIC = "mdc";
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
    */
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   //public static final String FEEDBACK_TOPIC = "mdc/feedback";
   
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
@@ -88,9 +99,15 @@
       
       String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
       
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
        MqttPahoMessageDrivenChannelAdapter adapter;
        if(!this.isRecevieMsg) {
           adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                    mqttClientFactory(),DEFAULT_TOPIC);
       }else {
          adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
       }
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -106,6 +123,7 @@
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
           String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
           if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
@@ -113,11 +131,11 @@
              String payload = (String)message.getPayload();
              logger.info("接收到mqtt消息readtime,data={}",payload);//消息体如何定义,数据点名称,值,时间,用json对象传过来
              //TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
              JSONObject data = JSONObject.parseObject(payload);
              long workstationId = data.getLong("workstationId");
              String name = data.getString("name");//key?
              long time = data.getLong("time");
              String v = data.getString("value");
              JSONObject payLoadJson = JSONObject.parseObject(payload);
              long workstationId = payLoadJson.getLong("workstationId");
              String name = payLoadJson.getString("name");//key?
              long time = payLoadJson.getLong("time");
              String v = payLoadJson.getString("value");
              
              //将数据加入缓存
              TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
@@ -126,17 +144,21 @@
                //发送websocket消息
                RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
                jsonWebSocketMessage.setId(""+workstationId);
                jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
                    {
                        put(name, telemetryDataResponseDTO);
                    }
                });
                //RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
                MessageDO messageDO = new MessageDO();
                messageDO.setNeedBroadcast(Boolean.FALSE);
                messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage));
                messageDistributor.distribute(messageDO);
                
                //从缓存(或直接查询)加载最新的数据,发送websocket
                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
                jsonWebSocketMessage.setData(map);
                MessageDO messageDO = new MessageDO();
                messageDO.setNeedBroadcast(false);
                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
                logger.info("send_to_page_websockettxt={}",msgtxt);
                messageDO.setMessageText(msgtxt);
                messageDistributor.distribute(messageDO);
           }
        };
    }