package com.qianwen.mdc.collect.config; import java.util.Arrays; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import com.qianwen.mdc.collect.service.DeviceStateFixPointService; import com.qianwen.mdc.collect.service.IOTMqttReceiveService; import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; import cn.hutool.core.date.DateTime; import cn.hutool.core.util.ObjectUtil; @Configuration public class MqttConfig { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${mqtt.host}") private String mqttHost; @Value("${mqtt.username:}") private String mqttUserName; @Value("${mqtt.password:}") private String mqttPassword; @Autowired private IOTMqttReceiveService recService; @Autowired private DeviceStateFixPointService stateFixPointService; @Autowired private WorkstationFeedbackService workstationFeedbackService; public static final String COLLECT_DATA_TOPIC = "forward/test"; /** * 反馈创建的topic(mdc中),本应用接收并处理 */ public static final String FEEDBACK_TOPIC = "mdc/feedback"; public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" if(ObjectUtil.isNotEmpty(mqttUserName)) { options.setUserName(mqttUserName); options.setPassword(mqttPassword.toCharArray()); } factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * * @return */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound", mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数 adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 入站消息处理 * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); if(FEEDBACK_TOPIC.equals(topic) ) { logger.info("状态反馈消息={}",message.getPayload()); Long workstationid = (Long)message.getPayload(); workstationFeedbackService.executeWaitAnalyseFeedback(workstationid); }else if(COLLECT_DATA_TOPIC.equals(topic)) { logger.info("采集数据接收消息={}",message.getPayload()); recService.handle((String)message.getPayload()); }else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) { String workstationId = (String)message.getPayload(); logger.info("工位创建接收消息={}",workstationId); stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId)); recService.handle((String)message.getPayload()); }else {//订阅了几个topic就会接收到几个,其他的不会进来 logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload()); } }; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC); return messageHandler; } }