package com.qianwen.mdc.collect.config;
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.cache.annotation.CachingConfigurerSupport;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
|
|
@Configuration
|
public class MqttConfig {
|
|
@Autowired
|
private IOTMqttReceiveService recService;
|
|
@Bean
|
public MqttPahoClientFactory mqttClientFactory() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(new String[] { "tcp://82.156.1.83:1884" });
|
// options.setUserName("your-username");
|
//options.setPassword("your-password".toCharArray());
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
public MessageProducer inbound() {
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
|
mqttClientFactory(), "forward/test");
|
adapter.setCompletionTimeout(5000);
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setQos(1);
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
public MessageHandler handler() {
|
return message -> {
|
|
recService.handle((String)message.getPayload());
|
};
|
}
|
|
@Bean
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
public MessageHandler mqttOutbound() {
|
MqttPahoMessageHandler messageHandler =
|
new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
|
messageHandler.setAsync(true);
|
messageHandler.setDefaultTopic("forward/test");
|
return messageHandler;
|
}
|
}
|