yangys
2025-11-05 10c5247722995e571b3fd4dbffb178964a9bd6ee
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -29,7 +29,7 @@
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
@@ -50,7 +50,8 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.isRecevieMsg:false}")
    private boolean isRecevieMsg;
    @Autowired
    private RedisMessageDistributor messageDistributor;
    
@@ -63,7 +64,7 @@
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
    */
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   //public static final String FEEDBACK_TOPIC = "mdc/feedback";
   
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
@@ -98,9 +99,15 @@
       
       String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
       
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
        MqttPahoMessageDrivenChannelAdapter adapter;
        if(!this.isRecevieMsg) {
           adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                    mqttClientFactory(),DEFAULT_TOPIC);
       }else {
          adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
       }
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -116,6 +123,7 @@
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
           String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
           if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
@@ -142,43 +150,15 @@
                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
                
                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
                /*
                //找到状态的数据,加入上一个状态的
                final String statusDpName = "DeviceStatus";
                if(map.containsKey(statusDpName)) {
                   TelemetryDataResponseDTO statusDTO = (TelemetryDataResponseDTO) map.get(statusDpName);
                   JSONObject statusJson = new JSONObject();
                   statusJson.put("t", statusDTO.getT());
                   statusJson.put("v", statusDTO.getV());
                   long preT = statusDTO.getT();
                   //最后一条不同值(v)的数据
                   ProcessParameterVO diffStatusVO = processParamMapper.lastParameterNotEqValue(workstationId, statusDpName, statusDTO.getV());
                   ProcessParameterVO tempStatusVO;
                   if(diffStatusVO != null) {
                      tempStatusVO = processParamMapper.firstParameterEqValueGtTime(workstationId, statusDpName, statusDTO.getV(), diffStatusVO.getTime().getTime());
                   }else {
                      tempStatusVO = processParamMapper.firstParameterEqValue(workstationId, statusDpName, statusDTO.getV());
                   }
                   if(tempStatusVO != null) {
                      preT = tempStatusVO.getTime().getTime();
                   }
                   statusJson.put("preT", preT);
                   map.put(statusDpName, statusJson);//覆盖原来的DeviceStatus
                }
                */
                
                jsonWebSocketMessage.setData(map);
                MessageDO messageDO = new MessageDO();
                messageDO.setNeedBroadcast(false);
                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
                logger.info("websockettxt={}",msgtxt);
                logger.info("send_to_page_websockettxt={}",msgtxt);
                messageDO.setMessageText(msgtxt);
               
                messageDistributor.distribute(messageDO);
           }
        };
    }