From 7ef593e1e3c35aaeecf9318f0b3941230d3ed002 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 09 十月 2024 11:22:54 +0800
Subject: [PATCH] 增加在数据点计算规则后数据点名称加_n的适配
---
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 83 ++++++++++++++++++++++++++++++++++++-----
1 files changed, 72 insertions(+), 11 deletions(-)
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
index 0a3f1d9..ec769d0 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -1,8 +1,12 @@
package com.qianwen.mdc.collect.config;
+import java.util.Arrays;
+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CachingConfigurerSupport;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
@@ -13,24 +17,55 @@
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
+import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
+import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
+
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.util.ObjectUtil;
@Configuration
public class MqttConfig {
-
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+ @Value("${mqtt.host}")
+ private String mqttHost;
+
+ @Value("${mqtt.username:}")
+ private String mqttUserName;
+
+ @Value("${mqtt.password:}")
+ private String mqttPassword;
+
@Autowired
private IOTMqttReceiveService recService;
+ @Autowired
+ private DeviceStateFixPointService stateFixPointService;
+
+ @Autowired
+ private WorkstationFeedbackService workstationFeedbackService;
+ public static final String COLLECT_DATA_TOPIC = "forward/test";
+
+ /**
+ * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
+ */
+ public static final String FEEDBACK_TOPIC = "mdc/feedback";
+
+ public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
- options.setServerURIs(new String[] { "tcp://82.156.1.83:1884" });
- // options.setUserName("your-username");
- //options.setPassword("your-password".toCharArray());
+ options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884"
+
+ if(ObjectUtil.isNotEmpty(mqttUserName)) {
+ options.setUserName(mqttUserName);
+ options.setPassword(mqttPassword.toCharArray());
+ }
factory.setConnectionOptions(options);
return factory;
}
@@ -40,24 +75,50 @@
return new DirectChannel();
}
+ /**
+ *
+ * @return
+ */
@Bean
public MessageProducer inbound() {
+ java.util.Random r = new java.util.Random();
+
+ String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
- mqttClientFactory(), "forward/test");
+ new MqttPahoMessageDrivenChannelAdapter(clientId,
+ mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
-
+
+
+ /**
+ * 鍏ョ珯娑堟伅澶勭悊
+ * @return
+ */
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
-
- recService.handle((String)message.getPayload());
+ String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
+ if(FEEDBACK_TOPIC.equals(topic) ) {
+ logger.info("鐘舵�佸弽棣堟秷鎭�={}",message.getPayload());
+ Long workstationid = (Long)message.getPayload();
+ workstationFeedbackService.executeWaitAnalyseFeedback(workstationid);
+ }else if(COLLECT_DATA_TOPIC.equals(topic)) {
+ logger.info("閲囬泦鏁版嵁鎺ユ敹娑堟伅={}",message.getPayload());
+ recService.handle((String)message.getPayload());
+ }else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) {
+ String workstationId = (String)message.getPayload();
+ logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId);
+ stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
+ recService.handle((String)message.getPayload());
+ }else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉�
+ logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload());
+ }
};
}
@@ -72,7 +133,7 @@
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
messageHandler.setAsync(true);
- messageHandler.setDefaultTopic("forward/test");
+ messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC);
return messageHandler;
}
}
\ No newline at end of file
--
Gitblit v1.9.3