From fe82f1f9a9be911d1420fe3b018ea85dd5fff1a3 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期四, 21 十一月 2024 21:22:58 +0800
Subject: [PATCH] 代码整理

---
 collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java |   41 +++++++++++++++++++++++++++++------------
 1 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
index df16c29..cfbba4d 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -21,9 +21,10 @@
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 
+import com.alibaba.fastjson.JSONObject;
 import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
 import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
-import com.qianwen.mdc.collect.service.WorkstationAppMappingService;
+import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
 import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
 
 import cn.hutool.core.date.DateTime;
@@ -40,34 +41,46 @@
 
     @Value("${mqtt.password:}")
     private String mqttPassword;
-
+    
+    @Value("${mqtt.timeout:1000}")
+    private int timeout;
+    
 	@Autowired
 	private IOTMqttReceiveService recService;
 	@Autowired
     private DeviceStateFixPointService stateFixPointService;
-	
 	@Autowired
     private WorkstationFeedbackService workstationFeedbackService;
 	
 	@Autowired
-    private WorkstationAppMappingService workstationAppMappingService;
-	
-	public static final String COLLECT_DATA_TOPIC = "forward/test";
+    private WorkstationDatapointsService dpService;
+	/**
+	 * 鎺ユ敹鏁版嵁鐨刴qtt topic锛屽湪IOT骞冲彴閰嶇疆鐨�
+	 */
+	@Value("${mqtt.dataReceiveTopic:}")
+	public String COLLECT_DATA_TOPIC;
 	
 	/**
 	 * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
 	 */
 	public static final String FEEDBACK_TOPIC = "mdc/feedback";
 	
+	/**
+	 * 宸ヤ綅鍒涘缓娑堟伅锛宮dc涓柊寤哄伐浣嶆椂鍙戦�侊紝鏈簲鐢ㄦ帴鏀跺苟澶勭悊
+	 */
 	public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
 	
-	private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed";
+	/**
+	 * 宸ヤ綅鏁版嵁鐐瑰彉鍖�
+	 */
+	private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
 	
 	@Bean
     public MqttPahoClientFactory mqttClientFactory() {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();
         options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" 
+        options.setConnectionTimeout(timeout);
         
         if(ObjectUtil.isNotEmpty(mqttUserName)) {
         	options.setUserName(mqttUserName);
@@ -93,7 +106,7 @@
     	String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
         MqttPahoMessageDrivenChannelAdapter adapter =
                 new MqttPahoMessageDrivenChannelAdapter(clientId,
-                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(1);
@@ -123,10 +136,14 @@
         		logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId);
         		stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
         		recService.handle((String)message.getPayload());
-        	}else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) {
-        		String workstationId = (String)message.getPayload();
-        		logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",workstationId);
-        		workstationAppMappingService.saveToCache();
+        	}else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
+        		String payload = (String)message.getPayload();
+        		logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",payload);
+        		//workstationAppMappingService.saveToCache();
+        		//娓呴櫎璇ュ伐浣嶇殑鏁版嵁鐐圭紦瀛�
+        		JSONObject payloadObj = JSONObject.parseObject(payload);
+        		//payloadObj.getLong("workstationId");
+        		dpService.datapointsCacheEvict(payloadObj.getString("appId"));
         	} else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉�
         		logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload());
         	}

--
Gitblit v1.9.3