yangys
2024-09-27 26f8e5990686bdba2119024a260d986266506757
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.qianwen.mdc.collect.config;
 
import java.util.Arrays;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
 
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
 
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
 
@Configuration
public class MqttConfig {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${mqtt.host}")
    private String mqttHost;
 
    @Value("${mqtt.username:}")
    private String mqttUserName;
 
    @Value("${mqtt.password:}")
    private String mqttPassword;
 
    @Autowired
    private IOTMqttReceiveService recService;
    @Autowired
    private DeviceStateFixPointService stateFixPointService;
    
    @Autowired
    private WorkstationFeedbackService workstationFeedbackService;
    public static final String COLLECT_DATA_TOPIC = "forward/test";
    
    /**
     * 反馈创建的topic(mdc中),本应用接收并处理
     */
    public static final String FEEDBACK_TOPIC = "mdc/feedback";
    
    public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" 
        
        if(ObjectUtil.isNotEmpty(mqttUserName)) {
            options.setUserName(mqttUserName);
            options.setPassword(mqttPassword.toCharArray());
        }
        factory.setConnectionOptions(options);
        return factory;
    }
 
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * 
     * @return
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    
    
    /**
     * 入站消息处理
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
            if(FEEDBACK_TOPIC.equals(topic) ) {
                logger.info("状态反馈消息={}",message.getPayload());
                Long workstationid = (Long)message.getPayload();
                workstationFeedbackService.executeWaitAnalyseFeedback(workstationid);
            }else if(COLLECT_DATA_TOPIC.equals(topic)) {
                logger.info("采集数据接收消息={}",message.getPayload());
                recService.handle((String)message.getPayload());
            }else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) {
                String workstationId = (String)message.getPayload();
                logger.info("工位创建接收消息={}",workstationId);
                stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
                recService.handle((String)message.getPayload());
            }else {//订阅了几个topic就会接收到几个,其他的不会进来
                logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
            }
        };
    }
 
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC);
        return messageHandler;
    }
}