From 11d4be720620abf502d35000e2ed40d30c4023bf Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期一, 24 十一月 2025 16:33:34 +0800
Subject: [PATCH] 修复离线时间展示

---
 collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java |  188 +++++++++++++++++++++++++++++++---------------
 1 files changed, 126 insertions(+), 62 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index ec40308..73fe491 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,7 +4,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
@@ -20,11 +19,13 @@
 import org.springframework.stereotype.Service;
 
 import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.tool.utils.ObjectUtil;
 import com.qianwen.mdc.collect.config.IotDBSessionConfig;
 import com.qianwen.mdc.collect.constants.IOTDBConstant;
 import com.qianwen.mdc.collect.domain.TelemetryData;
 import com.qianwen.mdc.collect.domain.TelemetryDataItem;
-import com.qianwen.mdc.collect.utils.redis.RedisUtil;
+import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
 /**
  * 閲囬泦鏁版嵁澶勭悊鍏ュ簱
@@ -33,12 +34,19 @@
 public class CollectDataService {
 	private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
 	
-	//private String DB_PREFIX = "root.f2.";
 	private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
 	@Autowired
 	private IotDBSessionConfig iotdbConfig;
 	@Autowired
 	private IotDBCommonService iotDBCommonService;
+	@Autowired
+	private MqttMessageSender mqttMessageSender;
+	
+	
+	/**
+	 * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚�
+	 */
+	public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
 	
 	private static String TEMPLATE_NAME = "process_param";
 
@@ -56,32 +64,69 @@
 	public void handleCollectData(List<TelemetryData> telemetryDataList) {
 
 		for (TelemetryData dt : telemetryDataList) {
+			
 			handleOneWorkstation(dt);
+			
+			sendRealtimeDataMsg(dt);
 		}
 
 	}
 
+	void sendRealtimeDataMsg(TelemetryData dt) {
+		if(ObjectUtil.isEmpty(dt.getDataItems())){
+			return;
+		}
+		
+		List<TypedTelemetryData> dataList= new ArrayList<>();
+		String propertyName;
+		for (TelemetryDataItem dataItem : dt.getDataItems()) {
+
+			for (Map<String, String> point : dataItem.getDataPoints()) {
+		        
+		        String[] keys = point.keySet().toArray(new String[0]);
+		        for(int i=0;i<keys.length;i++) {
+		        	TypedTelemetryData tpData = new TypedTelemetryData();
+		        	propertyName = keys[i];
+		        	tpData.setTime(dataItem.getTime());
+		        	tpData.setName(propertyName);
+		        	tpData.setValue(point.get(propertyName));
+		        	
+		        	dataList.add(tpData);
+		        }
+		  
+			}
+			
+		}
+		
+		//鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡
+		for(TypedTelemetryData item : dataList) {
+			JSONObject json = new JSONObject();
+			json.put("workstationId",dt.getWorkstationId());
+			json.put("name", item.getName());
+			json.put("value", item.getValue());
+			json.put("time", item.getTime());
+			
+			mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
+		}
+		
+	}
 	/**
 	 * 澶勭悊涓�涓伐浣嶇殑鏁版嵁瑙f瀽鍏ュ簱
 	 * @param dt
 	 */
 	void handleOneWorkstation(TelemetryData dt) {
-		String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
-
-		// 鎸傝浇妯℃澘
-		//iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
-
-		List<MeasurementSchema> schemas = new ArrayList<>();
+		String deviceId;
 		
+		//long workstationId = dpVo.getWorkstationId();
+		
+		List<MeasurementSchema> schemas = new ArrayList<>();
 		schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
 		schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
 		schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
 		
-		
 		int rowIndex = 0;
 		
 		Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
-		
 		
 		String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
 		String name;
@@ -94,24 +139,45 @@
 			iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
 			Tablet tablet = new Tablet(deviceId, schemas);
 			for(TypedTelemetryData tdata : typeList) {
+				
 				rowIndex = tablet.rowSize++;
 				tablet.addTimestamp(rowIndex, tdata.getTime());
 				tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
 				tablet.addValue("n",rowIndex,tdata.getName());
-	        	tablet.addValue("v",rowIndex,tdata.getValue());
+				if(!tdata.getName().equals("Alarm")) {
+					tablet.addValue("v",rowIndex,tdata.getValue());
+				}else {
+					//鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+					tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
+				}
 			}
 			
 			try {
 				iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-				
-				updateLastParam(dt.getWorkstationId(),typeList);
+				//updateLastParam(dt.getWorkstationId(),typeList);
 			} catch (Exception e) {
 				log.error("IOTDB鍏ュ簱澶辫触",e);
-				e.printStackTrace();
+			}finally {
+				//iotdbConfig.getSessionPool().clo1se();
 			}
 		}
 		
 
+	}
+	
+	/**
+	 * 灏嗘姤璀︿俊鎭牸寮忓寲鏈猨son瀵硅薄(鍘熺増鏄痡sonobj鎴栬�卝son鏁扮粍)锛屾牸寮忎负  {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+	 * @param collectAlarmValue
+	 * @return
+	 */
+	String formatAlarmMsg(long time,String collectAlarmValue){
+		JSONObject alarmObj = new JSONObject();
+		alarmObj.put("timestamp", time);
+		alarmObj.put("code", "00");
+		alarmObj.put("msg", collectAlarmValue);
+		alarmObj.put("alarmtype", 0);
+		alarmObj.put("level", "");
+		return alarmObj.toJSONString();
 	}
 	
 	/**
@@ -132,64 +198,62 @@
 		schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
 		
 		Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
-		for(TypedTelemetryData tdata: typeList) {
-			
-		}
-	
+
 		
 		String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
-		SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
 		
-		if(dsw.hasNext()) {
-			RowRecord rec = dsw.next();
-			long time = rec.getTimestamp();
-			
-			String paramJsonStr = rec.getFields().get(2).getStringValue();
-			
-			tablet.rowSize = 1;
-			tablet.addTimestamp(0, time);
-			tablet.addValue("update_time", 0, updateTime);
-			tablet.addValue("workstation_id", 0, workstationId);
-			JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
-			for(TypedTelemetryData tdata: typeList) {
+		try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
+			if(dsw.hasNext()) {
+				RowRecord rec = dsw.next();
+				long time = rec.getTimestamp();
 				
-				if(paramObj.containsKey(tdata.getName())) {
-					JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
-					itemObj.put("value", tdata.getValue());
-					itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
-					paramObj.put(tdata.getName(), itemObj);
-				}else {
+				String paramJsonStr = rec.getFields().get(2).getStringValue();
+				
+				tablet.rowSize = 1;
+				tablet.addTimestamp(0, time);
+				tablet.addValue("update_time", 0, updateTime);
+				tablet.addValue("workstation_id", 0, workstationId);
+				JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
+				for(TypedTelemetryData tdata: typeList) {
+					
+					if(paramObj.containsKey(tdata.getName())) {
+						JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
+						itemObj.put("value", tdata.getValue());
+						itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+						paramObj.put(tdata.getName(), itemObj);
+					}else {
+						JSONObject itemObj = new JSONObject();
+						itemObj.put("value", tdata.getValue());
+						itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+						paramObj.put(tdata.getName(), itemObj);
+						
+					}
+				}
+				tablet.addValue("param_json", 0, paramObj.toJSONString());
+				
+				this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+	
+			}else {
+				//娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
+				tablet.rowSize = 1;
+				
+				tablet.addTimestamp(0, updateTime);
+				tablet.addValue("update_time", 0, updateTime);
+				tablet.addValue("workstation_id", 0, workstationId);
+				
+				JSONObject paramObj = new JSONObject();
+				for(TypedTelemetryData tdata: typeList) {
 					JSONObject itemObj = new JSONObject();
 					itemObj.put("value", tdata.getValue());
 					itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
 					paramObj.put(tdata.getName(), itemObj);
-					
 				}
+	
+				tablet.addValue("param_json", 0,paramObj.toJSONString());
+				this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 			}
-			tablet.addValue("param_json", 0, paramObj.toJSONString());
-			
-			this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-
-		}else {
-			//娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
-			tablet.rowSize = 1;
-			
-			tablet.addTimestamp(0, updateTime);
-			tablet.addValue("update_time", 0, updateTime);
-			tablet.addValue("workstation_id", 0, workstationId);
-			
-			JSONObject paramObj = new JSONObject();
-			for(TypedTelemetryData tdata: typeList) {
-				JSONObject itemObj = new JSONObject();
-				itemObj.put("value", tdata.getValue());
-				itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
-				paramObj.put(tdata.getName(), itemObj);
-			}
-
-			tablet.addValue("param_json", 0,paramObj.toJSONString());
-			this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 		}
-		
+		//dsw.close();
 	}
 	
 	/**

--
Gitblit v1.9.3