yangys
2024-11-21 fe82f1f9a9be911d1420fe3b018ea85dd5fff1a3
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -21,9 +21,10 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.WorkstationAppMappingService;
import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
import cn.hutool.core.date.DateTime;
@@ -40,34 +41,46 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.timeout:1000}")
    private int timeout;
   @Autowired
   private IOTMqttReceiveService recService;
   @Autowired
    private DeviceStateFixPointService stateFixPointService;
   @Autowired
    private WorkstationFeedbackService workstationFeedbackService;
   
   @Autowired
    private WorkstationAppMappingService workstationAppMappingService;
   public static final String COLLECT_DATA_TOPIC = "forward/test";
    private WorkstationDatapointsService dpService;
   /**
    * 接收数据的mqtt topic,在IOT平台配置的
    */
   @Value("${mqtt.dataReceiveTopic:}")
   public String COLLECT_DATA_TOPIC;
   
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
    */
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   
   /**
    * 工位创建消息,mdc中新建工位时发送,本应用接收并处理
    */
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
   private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed";
   /**
    * 工位数据点变化
    */
   private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
   
   @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" 
        options.setConnectionTimeout(timeout);
        
        if(ObjectUtil.isNotEmpty(mqttUserName)) {
           options.setUserName(mqttUserName);
@@ -93,7 +106,7 @@
       String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//最后一个参数允许多个topic参数
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//最后一个参数允许多个topic参数
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -123,10 +136,14 @@
              logger.info("工位创建接收消息={}",workstationId);
              stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
              recService.handle((String)message.getPayload());
           }else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) {
              String workstationId = (String)message.getPayload();
              logger.info("工位appId映射变化消息={}",workstationId);
              workstationAppMappingService.saveToCache();
           }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
              String payload = (String)message.getPayload();
              logger.info("工位appId映射变化消息={}",payload);
              //workstationAppMappingService.saveToCache();
              //清除该工位的数据点缓存
              JSONObject payloadObj = JSONObject.parseObject(payload);
              //payloadObj.getLong("workstationId");
              dpService.datapointsCacheEvict(payloadObj.getString("appId"));
           } else {//订阅了几个topic就会接收到几个,其他的不会进来
              logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
           }