From c27b939fa5fa6ce4d712f7e9ced2ad811d69d5ec Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 30 十月 2024 20:46:25 +0800
Subject: [PATCH] 去掉dnc部分
---
smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java | 62 +++++++++++++++++++++----------
1 files changed, 42 insertions(+), 20 deletions(-)
diff --git a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
index 632bb43..0743b8c 100644
--- a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
+++ b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java
@@ -2,6 +2,7 @@
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@@ -28,7 +29,11 @@
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
-import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO;
+import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
+import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
+import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
+import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
+import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.ObjectUtil;
@@ -45,10 +50,16 @@
@Value("${mqtt.password:}")
private String mqttPassword;
-
+ @Value("${mqtt.isRecevieMsg:false}")
+ private boolean isRecevieMsg;
@Autowired
private RedisMessageDistributor messageDistributor;
+ //@Autowired
+ //private SuperProcessParameterMapper processParamMapper;
+
+ @Autowired
+ private RealTimeDataService realTimeDataService;
public static final String DEFAULT_TOPIC = "mdc";
/**
* 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
@@ -88,9 +99,15 @@
String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter(clientId,
- mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+ MqttPahoMessageDrivenChannelAdapter adapter;
+ if(!this.isRecevieMsg) {
+ adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+ mqttClientFactory(),DEFAULT_TOPIC);
+ }else {
+ adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+ mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+ }
+
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
@@ -106,6 +123,7 @@
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
+
return message -> {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
@@ -113,11 +131,11 @@
String payload = (String)message.getPayload();
logger.info("鎺ユ敹鍒癿qtt娑堟伅readtime,data={}",payload);//娑堟伅浣撳浣曞畾涔夛紝鏁版嵁鐐瑰悕绉帮紝鍊硷紝鏃堕棿锛岀敤json瀵硅薄浼犺繃鏉�
//TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
- JSONObject data = JSONObject.parseObject(payload);
- long workstationId = data.getLong("workstationId");
- String name = data.getString("name");//key锛�
- long time = data.getLong("time");
- String v = data.getString("value");
+ JSONObject payLoadJson = JSONObject.parseObject(payload);
+ long workstationId = payLoadJson.getLong("workstationId");
+ String name = payLoadJson.getString("name");//key锛�
+ long time = payLoadJson.getLong("time");
+ String v = payLoadJson.getString("value");
//灏嗘暟鎹姞鍏ョ紦瀛�
TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
@@ -126,17 +144,21 @@
//鍙戦�亀ebsocket娑堟伅
RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
jsonWebSocketMessage.setId(""+workstationId);
- jsonWebSocketMessage.setData(new HashMap<String, Object>() { // from class: org.springblade.modules.cps.message.consumer.TelemetryDataRealTimeConsumer.1
- {
- put(name, telemetryDataResponseDTO);
- }
- });
- //RedisMessageDistributor messageDistributor = (RedisMessageDistributor) SpringUtil.getBean(RedisMessageDistributor.class);
- MessageDO messageDO = new MessageDO();
- messageDO.setNeedBroadcast(Boolean.FALSE);
- messageDO.setMessageText(JSONUtil.toJsonStr(jsonWebSocketMessage));
- messageDistributor.distribute(messageDO);
+
+ //浠庣紦瀛橈紙鎴栫洿鎺ユ煡璇級鍔犺浇鏈�鏂扮殑鏁版嵁锛屽彂閫亀ebsocket
+ Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
+
+ realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
+
+ jsonWebSocketMessage.setData(map);
+ MessageDO messageDO = new MessageDO();
+ messageDO.setNeedBroadcast(false);
+ String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
+ logger.info("websockettxt={}",msgtxt);
+ messageDO.setMessageText(msgtxt);
+
+ messageDistributor.distribute(messageDO);
}
};
}
--
Gitblit v1.9.3