yangys
2024-10-25 9faa74e1912022dc6e54c3e93426946876b5d83a
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -21,8 +21,10 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
import cn.hutool.core.date.DateTime;
@@ -39,15 +41,20 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
   @Autowired
   private IOTMqttReceiveService recService;
   @Autowired
    private DeviceStateFixPointService stateFixPointService;
   @Autowired
    private WorkstationFeedbackService workstationFeedbackService;
   public static final String COLLECT_DATA_TOPIC = "forward/test";
   @Autowired
    private WorkstationDatapointsService dpService;
   /**
    * 接收数据的mqtt topic,在IOT平台配置的
    */
   @Value("${mqtt.dataReceiveTopic:}")
   public String COLLECT_DATA_TOPIC;
   
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
@@ -55,6 +62,11 @@
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   /**
    * 工位数据点变化
    */
   private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
   
   @Bean
    public MqttPahoClientFactory mqttClientFactory() {
@@ -81,9 +93,12 @@
     */
    @Bean
    public MessageProducer inbound() {
       java.util.Random r = new java.util.Random();
       String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//最后一个参数允许多个topic参数
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -113,7 +128,15 @@
              logger.info("工位创建接收消息={}",workstationId);
              stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
              recService.handle((String)message.getPayload());
           }else {//订阅了几个topic就会接收到几个,其他的不会进来
           }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
              String payload = (String)message.getPayload();
              logger.info("工位appId映射变化消息={}",payload);
              //workstationAppMappingService.saveToCache();
              //清除该工位的数据点缓存
              JSONObject payloadObj = JSONObject.parseObject(payload);
              //payloadObj.getLong("workstationId");
              dpService.datapointsCacheEvict(payloadObj.getString("appId"));
           } else {//订阅了几个topic就会接收到几个,其他的不会进来
              logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
           }
        };