yangys
2024-09-27 26f8e5990686bdba2119024a260d986266506757
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -1,8 +1,12 @@
package com.qianwen.mdc.collect.config;
import java.util.Arrays;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
@@ -13,24 +17,55 @@
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
@Configuration
public class MqttConfig {
   private Logger logger = LoggerFactory.getLogger(this.getClass());
   @Value("${mqtt.host}")
    private String mqttHost;
    @Value("${mqtt.username:}")
    private String mqttUserName;
    @Value("${mqtt.password:}")
    private String mqttPassword;
   @Autowired
   private IOTMqttReceiveService recService;
   @Autowired
    private DeviceStateFixPointService stateFixPointService;
   @Autowired
    private WorkstationFeedbackService workstationFeedbackService;
   public static final String COLLECT_DATA_TOPIC = "forward/test";
   /**
    * 反馈创建的topic(mdc中),本应用接收并处理
    */
   public static final String FEEDBACK_TOPIC = "mdc/feedback";
   public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
   
   @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://82.156.1.83:1884" });
       // options.setUserName("your-username");
        //options.setPassword("your-password".toCharArray());
        options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884"
        if(ObjectUtil.isNotEmpty(mqttUserName)) {
           options.setUserName(mqttUserName);
           options.setPassword(mqttPassword.toCharArray());
        }
        factory.setConnectionOptions(options);
        return factory;
    }
@@ -40,24 +75,47 @@
        return new DirectChannel();
    }
    /**
     *
     * @return
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
                        mqttClientFactory(), "forward/test");
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    /**
     * 入站消息处理
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            recService.handle((String)message.getPayload());
           String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
           if(FEEDBACK_TOPIC.equals(topic) ) {
              logger.info("状态反馈消息={}",message.getPayload());
              Long workstationid = (Long)message.getPayload();
              workstationFeedbackService.executeWaitAnalyseFeedback(workstationid);
           }else if(COLLECT_DATA_TOPIC.equals(topic)) {
              logger.info("采集数据接收消息={}",message.getPayload());
              recService.handle((String)message.getPayload());
           }else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) {
              String workstationId = (String)message.getPayload();
              logger.info("工位创建接收消息={}",workstationId);
              stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
              recService.handle((String)message.getPayload());
           }else {//订阅了几个topic就会接收到几个,其他的不会进来
              logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
           }
        };
    }
@@ -72,7 +130,7 @@
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("forward/test");
        messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC);
        return messageHandler;
    }
}