From 9faa74e1912022dc6e54c3e93426946876b5d83a Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期五, 25 十月 2024 10:49:13 +0800
Subject: [PATCH] 修改redistemplate的hashkey serializer,更方便客户端查看redis的数据

---
 collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java |  103 ++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 92 insertions(+), 11 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
index 0a3f1d9..e96c3f6 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -1,8 +1,12 @@
 package com.qianwen.mdc.collect.config;
 
+import java.util.Arrays;
+
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CachingConfigurerSupport;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.integration.annotation.ServiceActivator;
@@ -13,24 +17,67 @@
 import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 
+import com.alibaba.fastjson.JSONObject;
+import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
 import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
+import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
+import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
+
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.util.ObjectUtil;
 
 @Configuration
 public class MqttConfig {
-	
+	private Logger logger = LoggerFactory.getLogger(this.getClass());
+	@Value("${mqtt.host}")
+    private String mqttHost;
+
+    @Value("${mqtt.username:}")
+    private String mqttUserName;
+
+    @Value("${mqtt.password:}")
+    private String mqttPassword;
 	@Autowired
 	private IOTMqttReceiveService recService;
+	@Autowired
+    private DeviceStateFixPointService stateFixPointService;
+	@Autowired
+    private WorkstationFeedbackService workstationFeedbackService;
+	
+	@Autowired
+    private WorkstationDatapointsService dpService;
+	/**
+	 * 鎺ユ敹鏁版嵁鐨刴qtt topic锛屽湪IOT骞冲彴閰嶇疆鐨�
+	 */
+	@Value("${mqtt.dataReceiveTopic:}")
+	public String COLLECT_DATA_TOPIC;
+	
+	/**
+	 * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
+	 */
+	public static final String FEEDBACK_TOPIC = "mdc/feedback";
+	
+	public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
+	
+	/**
+	 * 宸ヤ綅鏁版嵁鐐瑰彉鍖�
+	 */
+	private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
 	
 	@Bean
     public MqttPahoClientFactory mqttClientFactory() {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setServerURIs(new String[] { "tcp://82.156.1.83:1884" });
-       // options.setUserName("your-username");
-        //options.setPassword("your-password".toCharArray());
+        options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" 
+        
+        if(ObjectUtil.isNotEmpty(mqttUserName)) {
+        	options.setUserName(mqttUserName);
+        	options.setPassword(mqttPassword.toCharArray());
+        }
         factory.setConnectionOptions(options);
         return factory;
     }
@@ -40,24 +87,58 @@
         return new DirectChannel();
     }
 
+    /**
+     * 
+     * @return
+     */
     @Bean
     public MessageProducer inbound() {
+    	java.util.Random r = new java.util.Random();
+    	
+    	String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
         MqttPahoMessageDrivenChannelAdapter adapter =
-                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
-                        mqttClientFactory(), "forward/test");
+                new MqttPahoMessageDrivenChannelAdapter(clientId,
+                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(1);
         adapter.setOutputChannel(mqttInputChannel());
         return adapter;
     }
-
+    
+    
+    /**
+     * 鍏ョ珯娑堟伅澶勭悊
+     * @return
+     */
     @Bean
     @ServiceActivator(inputChannel = "mqttInputChannel")
     public MessageHandler handler() {
         return message -> {
-            
-            recService.handle((String)message.getPayload());
+        	String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
+        	if(FEEDBACK_TOPIC.equals(topic) ) {
+        		logger.info("鐘舵�佸弽棣堟秷鎭�={}",message.getPayload());
+        		Long workstationid = (Long)message.getPayload();
+        		workstationFeedbackService.executeWaitAnalyseFeedback(workstationid);
+        	}else if(COLLECT_DATA_TOPIC.equals(topic)) {
+        		logger.info("閲囬泦鏁版嵁鎺ユ敹娑堟伅={}",message.getPayload());
+        		recService.handle((String)message.getPayload());
+        	}else if(WOCKSTATION_CREATE_TOPIC.equals(topic)) {
+        		String workstationId = (String)message.getPayload();
+        		logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId);
+        		stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
+        		recService.handle((String)message.getPayload());
+        	}else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
+        		String payload = (String)message.getPayload();
+        		logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",payload);
+        		//workstationAppMappingService.saveToCache();
+        		//娓呴櫎璇ュ伐浣嶇殑鏁版嵁鐐圭紦瀛�
+        		JSONObject payloadObj = JSONObject.parseObject(payload);
+        		//payloadObj.getLong("workstationId");
+        		dpService.datapointsCacheEvict(payloadObj.getString("appId"));
+        	} else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉�
+        		logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload());
+        	}
         };
     }
 
@@ -72,7 +153,7 @@
         MqttPahoMessageHandler messageHandler =
                 new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());
         messageHandler.setAsync(true);
-        messageHandler.setDefaultTopic("forward/test");
+        messageHandler.setDefaultTopic(COLLECT_DATA_TOPIC);
         return messageHandler;
     }
 }
\ No newline at end of file

--
Gitblit v1.9.3