| | |
| | | package com.qianwen.mdc.collect.config; |
| | | |
| | | import java.util.Arrays; |
| | | |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.cache.annotation.CachingConfigurerSupport; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.integration.annotation.ServiceActivator; |
| | |
| | | import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
| | | import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; |
| | | import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
| | | import org.springframework.integration.mqtt.support.MqttHeaders; |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | |
| | | import com.qianwen.mdc.collect.service.DeviceStateFixPointService; |
| | | import com.qianwen.mdc.collect.service.IOTMqttReceiveService; |
| | | import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; |
| | | |
| | | import cn.hutool.core.date.DateTime; |
| | | import cn.hutool.core.util.ObjectUtil; |
| | | |
| | | @Configuration |
| | | public class MqttConfig { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(this.getClass()); |
| | | @Value("${mqtt.host}") |
| | | private String mqttHost; |
| | | |
| | | @Value("${mqtt.username:}") |
| | | private String mqttUserName; |
| | | |
| | | @Value("${mqtt.password:}") |
| | | private String mqttPassword; |
| | | |
| | | @Autowired |
| | | private IOTMqttReceiveService recService; |
| | | @Autowired |
| | | private DeviceStateFixPointService stateFixPointService; |
| | | |
| | | @Autowired |
| | | private WorkstationFeedbackService workstationFeedbackService; |
| | | public static final String COLLECT_DATA_TOPIC = "forward/test"; |
| | | |
| | | /** |
| | | * 反馈创建的topic(mdc中),本应用接收并处理 |
| | | */ |
| | | public static final String FEEDBACK_TOPIC = "mdc/feedback"; |
| | | |
| | | public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; |
| | | |
| | | @Bean |
| | | public MqttPahoClientFactory mqttClientFactory() { |
| | | DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setServerURIs(new String[] { "tcp://82.156.1.83:1884" }); |
| | | // options.setUserName("your-username"); |
| | | //options.setPassword("your-password".toCharArray()); |
| | | options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" |
| | | |
| | | if(ObjectUtil.isNotEmpty(mqttUserName)) { |
| | | options.setUserName(mqttUserName); |
| | | options.setPassword(mqttPassword.toCharArray()); |
| | | } |
| | | factory.setConnectionOptions(options); |
| | | return factory; |
| | | } |
| | |
| | | return new DirectChannel(); |
| | | } |
| | | |
| | | /** |
| | | * |
| | | * @return |
| | | */ |
| | | @Bean |
| | | public MessageProducer inbound() { |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound", |
| | | mqttClientFactory(), "forward/test"); |
| | | mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数 |
| | | adapter.setCompletionTimeout(5000); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(1); |
| | | adapter.setOutputChannel(mqttInputChannel()); |
| | | return adapter; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 入站消息处理 |
| | | * @return |
| | | */ |
| | | @Bean |
| | | @ServiceActivator(inputChannel = "mqttInputChannel") |
| | | public MessageHandler handler() { |
| | | return message -> { |
| | | |
| | | recService.handle((String)message.getPayload()); |
| | | String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); |
| | | if(FEEDBACK_TOPIC.equals(topic) ) { |
| | | logger.info("状态反馈消息={}",message.getPayload()); |
| | | Long workstationid = (Long)message.getPayload(); |
| | | workstationFeedbackService.executeWaitAnalyseFeedback(workstationid); |
| | | }else if(COLLECT_DATA_TOPIC.equals(topic)) { |
| | | logger.info("采集数据接收消息={}",message.getPayload()); |
| | | recService.handle((String)message.getPayload()); |
| | | }else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) { |
| | | String workstationId = (String)message.getPayload(); |
| | | logger.info("工位创建接收消息={}",workstationId); |
| | | stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId)); |
| | | recService.handle((String)message.getPayload()); |
| | | }else {//订阅了几个topic就会接收到几个,其他的不会进来 |
| | | logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload()); |
| | | } |
| | | }; |
| | | } |
| | | |
| | |
| | | MqttPahoMessageHandler messageHandler = |
| | | new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory()); |
| | | messageHandler.setAsync(true); |
| | | messageHandler.setDefaultTopic("forward/test"); |
| | | messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC); |
| | | return messageHandler; |
| | | } |
| | | } |