package com.qianwen.mdc.collect.config;
|
|
import java.util.Arrays;
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
|
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
|
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
|
|
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.util.ObjectUtil;
|
|
@Configuration
|
public class MqttConfig {
|
private Logger logger = LoggerFactory.getLogger(this.getClass());
|
@Value("${mqtt.host}")
|
private String mqttHost;
|
|
@Value("${mqtt.username:}")
|
private String mqttUserName;
|
|
@Value("${mqtt.password:}")
|
private String mqttPassword;
|
|
@Autowired
|
private IOTMqttReceiveService recService;
|
@Autowired
|
private DeviceStateFixPointService stateFixPointService;
|
|
@Autowired
|
private WorkstationFeedbackService workstationFeedbackService;
|
public static final String COLLECT_DATA_TOPIC = "forward/test";
|
|
/**
|
* 反馈创建的topic(mdc中),本应用接收并处理
|
*/
|
public static final String FEEDBACK_TOPIC = "mdc/feedback";
|
|
public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
|
|
@Bean
|
public MqttPahoClientFactory mqttClientFactory() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884"
|
|
if(ObjectUtil.isNotEmpty(mqttUserName)) {
|
options.setUserName(mqttUserName);
|
options.setPassword(mqttPassword.toCharArray());
|
}
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
/**
|
*
|
* @return
|
*/
|
@Bean
|
public MessageProducer inbound() {
|
java.util.Random r = new java.util.Random();
|
|
String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
new MqttPahoMessageDrivenChannelAdapter(clientId,
|
mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数
|
adapter.setCompletionTimeout(5000);
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setQos(1);
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
|
/**
|
* 入站消息处理
|
* @return
|
*/
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
public MessageHandler handler() {
|
return message -> {
|
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
|
if(FEEDBACK_TOPIC.equals(topic) ) {
|
logger.info("状态反馈消息={}",message.getPayload());
|
Long workstationid = (Long)message.getPayload();
|
workstationFeedbackService.executeWaitAnalyseFeedback(workstationid);
|
}else if(COLLECT_DATA_TOPIC.equals(topic)) {
|
logger.info("采集数据接收消息={}",message.getPayload());
|
recService.handle((String)message.getPayload());
|
}else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) {
|
String workstationId = (String)message.getPayload();
|
logger.info("工位创建接收消息={}",workstationId);
|
stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
|
recService.handle((String)message.getPayload());
|
}else {//订阅了几个topic就会接收到几个,其他的不会进来
|
logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
|
}
|
};
|
}
|
|
@Bean
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
public MessageHandler mqttOutbound() {
|
MqttPahoMessageHandler messageHandler =
|
new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
|
messageHandler.setAsync(true);
|
messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC);
|
return messageHandler;
|
}
|
}
|