From d728f14a2f23cb477ebfecd33df5f7e5cb54a178 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期二, 29 十月 2024 17:33:22 +0800
Subject: [PATCH] 去掉rocketmq

---
 smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java |   60 +++++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
index 632bb43..c2681ee 100644
--- a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
+++ b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -2,6 +2,7 @@
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -29,6 +30,10 @@
 import com.qianwen.smartman.common.cache.cps.WorkstationCache;
 import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
 import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
+import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
+import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
+import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
+import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
 
 import cn.hutool.core.date.DateTime;
 import cn.hutool.core.util.ObjectUtil;
@@ -45,10 +50,16 @@
 
     @Value("${mqtt.password:}")
     private String mqttPassword;
-
+    @Value("${mqtt.isRecevieMsg:false}")
+    private boolean isRecevieMsg;
     @Autowired
     private RedisMessageDistributor messageDistributor;
     
+    //@Autowired
+    //private SuperProcessParameterMapper processParamMapper;
+    
+    @Autowired
+    private RealTimeDataService realTimeDataService;
     public static final String DEFAULT_TOPIC = "mdc";
 	/**
 	 * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
@@ -88,9 +99,15 @@
     	
     	String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
     	
-        MqttPahoMessageDrivenChannelAdapter adapter =
-                new MqttPahoMessageDrivenChannelAdapter(clientId,
-                        mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+        MqttPahoMessageDrivenChannelAdapter adapter;
+        if(!this.isRecevieMsg) {
+        	adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                    mqttClientFactory(),DEFAULT_TOPIC);
+    	}else {
+    		adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                            mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+    	}
+        
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(1);
@@ -106,6 +123,7 @@
     @Bean
     @ServiceActivator(inputChannel = "mqttInputChannel")
     public MessageHandler handler() {
+    	
         return message -> {
         	String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
         	if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
@@ -113,11 +131,11 @@
         		String payload = (String)message.getPayload();
         		logger.info("鎺ユ敹鍒癿qtt娑堟伅readtime,data={}",payload);//娑堟伅浣撳浣曞畾涔夛紝鏁版嵁鐐瑰悕绉帮紝鍊硷紝鏃堕棿锛岀敤json瀵硅薄浼犺繃鏉�
         		//TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
-        		JSONObject data = JSONObject.parseObject(payload);
-        		long workstationId = data.getLong("workstationId");
-        		String name = data.getString("name");//key锛�
-        		long time = data.getLong("time");
-        		String v = data.getString("value");
+        		JSONObject payLoadJson = JSONObject.parseObject(payload);
+        		long workstationId = payLoadJson.getLong("workstationId");
+        		String name = payLoadJson.getString("name");//key锛�
+        		long time = payLoadJson.getLong("time");
+        		String v = payLoadJson.getString("value");
         		
         		//灏嗘暟鎹姞鍏ョ紦瀛�
         		TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
@@ -126,17 +144,21 @@
                 //鍙戦�亀ebsocket娑堟伅
                 RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
                 jsonWebSocketMessage.setId(""+workstationId);
-                jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
-                    {
-                        put(name, telemetryDataResponseDTO);
-                    }
-                });
-                //RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
-                MessageDO messageDO = new MessageDO();
-                messageDO.setNeedBroadcast(Boolean.FALSE);
-                messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage));
-                messageDistributor.distribute(messageDO);
                 
+                
+                //浠庣紦瀛橈紙鎴栫洿鎺ユ煡璇級鍔犺浇鏈�鏂扮殑鏁版嵁锛屽彂閫亀ebsocket
+                Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
+                
+                realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
+                
+                jsonWebSocketMessage.setData(map);
+                MessageDO messageDO = new MessageDO();
+                messageDO.setNeedBroadcast(false);
+                String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
+                logger.info("websockettxt={}",msgtxt);
+                messageDO.setMessageText(msgtxt);
+               
+                messageDistributor.distribute(messageDO);
         	}
         };
     }

--
Gitblit v1.9.3