From c27b939fa5fa6ce4d712f7e9ced2ad811d69d5ec Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 30 十月 2024 20:46:25 +0800
Subject: [PATCH] 去掉dnc部分
---
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java | 82 ++++++++++++++++++++++++++++++++++++----
1 files changed, 73 insertions(+), 9 deletions(-)
diff --git a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
index c5a585f..0743b8c 100644
--- a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
+++ b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -1,6 +1,8 @@
package com.qianwen.smartman.common.config;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -22,10 +24,20 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
-
+import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.websocket.distribute.MessageDO;
+import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
+import com.qianwen.smartman.common.cache.cps.WorkstationCache;
+import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
+import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
+import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
+import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
+import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
+import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
@Configuration
public class MdcMqttConfig {
@@ -38,7 +50,16 @@
@Value("${mqtt.password:}")
private String mqttPassword;
-
+ @Value("${mqtt.isRecevieMsg:false}")
+ private boolean isRecevieMsg;
+ @Autowired
+ private RedisMessageDistributor messageDistributor;
+
+ //@Autowired
+ //private SuperProcessParameterMapper processParamMapper;
+
+ @Autowired
+ private RealTimeDataService realTimeDataService;
public static final String DEFAULT_TOPIC = "mdc";
/**
* 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
@@ -47,6 +68,7 @@
public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
+ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -70,19 +92,29 @@
*
* @return
*/
- /*
+
@Bean
public MessageProducer inbound() {
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",
- mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+ java.util.Random r = new java.util.Random();
+
+ String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
+
+ MqttPahoMessageDrivenChannelAdapter adapter;
+ if(!this.isRecevieMsg) {
+ adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+ mqttClientFactory(),DEFAULT_TOPIC);
+ }else {
+ adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+ mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+ }
+
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
- */
+
/**
* 鍏ョ珯娑堟伅澶勭悊
@@ -91,10 +123,42 @@
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
+
return message -> {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
- if(FEEDBACK_TOPIC.equals(topic) ) {
-
+ if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
+ //瀹炴椂鏁版嵁鏉ヤ簡锛屾暟鎹粠collect鍙戦�佹潵
+ String payload = (String)message.getPayload();
+ logger.info("鎺ユ敹鍒癿qtt娑堟伅readtime,data={}",payload);//娑堟伅浣撳浣曞畾涔夛紝鏁版嵁鐐瑰悕绉帮紝鍊硷紝鏃堕棿锛岀敤json瀵硅薄浼犺繃鏉�
+ //TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
+ JSONObject payLoadJson = JSONObject.parseObject(payload);
+ long workstationId = payLoadJson.getLong("workstationId");
+ String name = payLoadJson.getString("name");//key锛�
+ long time = payLoadJson.getLong("time");
+ String v = payLoadJson.getString("value");
+
+ //灏嗘暟鎹姞鍏ョ紦瀛�
+ TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
+ WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO);
+
+ //鍙戦�亀ebsocket娑堟伅
+ RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
+ jsonWebSocketMessage.setId(""+workstationId);
+
+
+ //浠庣紦瀛橈紙鎴栫洿鎺ユ煡璇級鍔犺浇鏈�鏂扮殑鏁版嵁锛屽彂閫亀ebsocket
+ Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
+
+ realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
+
+ jsonWebSocketMessage.setData(map);
+ MessageDO messageDO = new MessageDO();
+ messageDO.setNeedBroadcast(false);
+ String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
+ logger.info("websockettxt={}",msgtxt);
+ messageDO.setMessageText(msgtxt);
+
+ messageDistributor.distribute(messageDO);
}
};
}
--
Gitblit v1.9.3