From c27b939fa5fa6ce4d712f7e9ced2ad811d69d5ec Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 30 十月 2024 20:46:25 +0800
Subject: [PATCH] 去掉dnc部分

---
 smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java |   82 ++++++++++++++++++++++++++++++++++++----
 1 files changed, 73 insertions(+), 9 deletions(-)

diff --git a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
index c5a585f..0743b8c 100644
--- a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
+++ b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -1,6 +1,8 @@
 package com.qianwen.smartman.common.config;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -22,10 +24,20 @@
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 
-
+import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.websocket.distribute.MessageDO;
+import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
+import com.qianwen.smartman.common.cache.cps.WorkstationCache;
+import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
+import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
+import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
+import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
+import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
+import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
 
 import cn.hutool.core.date.DateTime;
 import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
 
 @Configuration
 public class MdcMqttConfig {
@@ -38,7 +50,16 @@
 
     @Value("${mqtt.password:}")
     private String mqttPassword;
-
+    @Value("${mqtt.isRecevieMsg:false}")
+    private boolean isRecevieMsg;
+    @Autowired
+    private RedisMessageDistributor messageDistributor;
+    
+    //@Autowired
+    //private SuperProcessParameterMapper processParamMapper;
+    
+    @Autowired
+    private RealTimeDataService realTimeDataService;
     public static final String DEFAULT_TOPIC = "mdc";
 	/**
 	 * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
@@ -47,6 +68,7 @@
 	
 	public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
 	
+	public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
 	@Bean
     public MqttPahoClientFactory mqttClientFactory() {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -70,19 +92,29 @@
      * 
      * @return
      */
-    /*
+    
     @Bean
     public MessageProducer inbound() {
-        MqttPahoMessageDrivenChannelAdapter adapter =
-                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
-                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+    	java.util.Random r = new java.util.Random();
+    	
+    	String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
+    	
+        MqttPahoMessageDrivenChannelAdapter adapter;
+        if(!this.isRecevieMsg) {
+        	adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                    mqttClientFactory(),DEFAULT_TOPIC);
+    	}else {
+    		adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+    	}
+        
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(1);
         adapter.setOutputChannel(mqttInputChannel());
         return adapter;
     }
-    */
+    
     
     /**
      * 鍏ョ珯娑堟伅澶勭悊
@@ -91,10 +123,42 @@
     @Bean
     @ServiceActivator(inputChannel = "mqttInputChannel")
     public MessageHandler handler() {
+    	
         return message -> {
         	String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
-        	if(FEEDBACK_TOPIC.equals(topic) ) {
-        	
+        	if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
+        		//瀹炴椂鏁版嵁鏉ヤ簡锛屾暟鎹粠collect鍙戦�佹潵
+        		String payload = (String)message.getPayload();
+        		logger.info("鎺ユ敹鍒癿qtt娑堟伅readtime,data={}",payload);//娑堟伅浣撳浣曞畾涔夛紝鏁版嵁鐐瑰悕绉帮紝鍊硷紝鏃堕棿锛岀敤json瀵硅薄浼犺繃鏉�
+        		//TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
+        		JSONObject payLoadJson = JSONObject.parseObject(payload);
+        		long workstationId = payLoadJson.getLong("workstationId");
+        		String name = payLoadJson.getString("name");//key锛�
+        		long time = payLoadJson.getLong("time");
+        		String v = payLoadJson.getString("value");
+        		
+        		//灏嗘暟鎹姞鍏ョ紦瀛�
+        		TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
+                WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO);
+                
+                //鍙戦�亀ebsocket娑堟伅
+                RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
+                jsonWebSocketMessage.setId(""+workstationId);
+                
+                
+                //浠庣紦瀛橈紙鎴栫洿鎺ユ煡璇級鍔犺浇鏈�鏂扮殑鏁版嵁锛屽彂閫亀ebsocket
+                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
+                
+                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
+                
+                jsonWebSocketMessage.setData(map);
+                MessageDO messageDO = new MessageDO();
+                messageDO.setNeedBroadcast(false);
+                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
+                logger.info("websockettxt={}",msgtxt);
+                messageDO.setMessageText(msgtxt);
+               
+                messageDistributor.distribute(messageDO);
         	}
         };
     }

--
Gitblit v1.9.3