yangys
2025-11-05 10c5247722995e571b3fd4dbffb178964a9bd6ee
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -29,7 +29,11 @@
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
@@ -46,15 +50,21 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.isRecevieMsg:false}")
    private boolean isRecevieMsg;
    @Autowired
    private RedisMessageDistributor messageDistributor;
    
    //@Autowired
    //private SuperProcessParameterMapper processParamMapper;
    @Autowired
    private RealTimeDataService realTimeDataService;
    public static final String DEFAULT_TOPIC = "mdc";
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
    */
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   //public static final String FEEDBACK_TOPIC = "mdc/feedback";
   
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
@@ -89,9 +99,15 @@
       
       String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
       
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
        MqttPahoMessageDrivenChannelAdapter adapter;
        if(!this.isRecevieMsg) {
           adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                    mqttClientFactory(),DEFAULT_TOPIC);
       }else {
          adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
       }
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -107,6 +123,7 @@
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
           String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
           if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
@@ -127,34 +144,21 @@
                //发送websocket消息
                RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
                jsonWebSocketMessage.setId(""+workstationId);
                /*
                jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
                    {
                        put(name, telemetryDataResponseDTO);
                    }
                });
                */
                
                //从缓存(或直接查询)加载最新的数据,发送websocket
                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
                jsonWebSocketMessage.setData(map);
                //RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
                MessageDO messageDO = new MessageDO();
                messageDO.setNeedBroadcast(false);
                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
                logger.info("websockettxt={}",msgtxt);
                logger.info("send_to_page_websockettxt={}",msgtxt);
                messageDO.setMessageText(msgtxt);
                /*
                JSONObject testMsg = new JSONObject();
                JSONObject msgdata = JSONObject.parseObject(JSONUtil.toJsonStr(map));
                testMsg.put("data", msgdata);
                testMsg.put("id", workstationId+"");
                String msgtxt = testMsg.toJSONString();
                */
               
                messageDistributor.distribute(messageDO);
           }
        };
    }