From 11d4be720620abf502d35000e2ed40d30c4023bf Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期一, 24 十一月 2025 16:33:34 +0800
Subject: [PATCH] 修复离线时间展示
---
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 217 ++++++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 170 insertions(+), 47 deletions(-)
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index f62c473..73fe491 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,13 +4,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -18,11 +18,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
+import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
-import com.qianwen.mdc.collect.entity.iotdb.CollectData;
-import com.qianwen.mdc.collect.utils.redis.RedisUtil;
+import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
* 閲囬泦鏁版嵁澶勭悊鍏ュ簱
@@ -31,14 +34,19 @@
public class CollectDataService {
private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
- private String DB_PREFIX = "root.f2.";
private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
@Autowired
private IotDBSessionConfig iotdbConfig;
@Autowired
private IotDBCommonService iotDBCommonService;
@Autowired
- private RedisUtil redisUtil;
+ private MqttMessageSender mqttMessageSender;
+
+
+ /**
+ * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚�
+ */
+ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
private static String TEMPLATE_NAME = "process_param";
@@ -52,36 +60,73 @@
PROCESS_PARAM_MAP.put(7, "ALARM");
}
- // TelemetryDataMessage telemetryDataMessage
+
public void handleCollectData(List<TelemetryData> telemetryDataList) {
for (TelemetryData dt : telemetryDataList) {
+
handleOneWorkstation(dt);
+
+ sendRealtimeDataMsg(dt);
}
}
+ void sendRealtimeDataMsg(TelemetryData dt) {
+ if(ObjectUtil.isEmpty(dt.getDataItems())){
+ return;
+ }
+
+ List<TypedTelemetryData> dataList= new ArrayList<>();
+ String propertyName;
+ for (TelemetryDataItem dataItem : dt.getDataItems()) {
+
+ for (Map<String, String> point : dataItem.getDataPoints()) {
+
+ String[] keys = point.keySet().toArray(new String[0]);
+ for(int i=0;i<keys.length;i++) {
+ TypedTelemetryData tpData = new TypedTelemetryData();
+ propertyName = keys[i];
+ tpData.setTime(dataItem.getTime());
+ tpData.setName(propertyName);
+ tpData.setValue(point.get(propertyName));
+
+ dataList.add(tpData);
+ }
+
+ }
+
+ }
+
+ //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡
+ for(TypedTelemetryData item : dataList) {
+ JSONObject json = new JSONObject();
+ json.put("workstationId",dt.getWorkstationId());
+ json.put("name", item.getName());
+ json.put("value", item.getValue());
+ json.put("time", item.getTime());
+
+ mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
+ }
+
+ }
/**
* 澶勭悊涓�涓伐浣嶇殑鏁版嵁瑙f瀽鍏ュ簱
* @param dt
*/
void handleOneWorkstation(TelemetryData dt) {
- String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
-
- // 鎸傝浇妯℃澘
- //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
-
- List<MeasurementSchema> schemas = new ArrayList<>();
+ String deviceId;
+ //long workstationId = dpVo.getWorkstationId();
+
+ List<MeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
-
int rowIndex = 0;
Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
-
String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
String name;
@@ -90,54 +135,132 @@
List<TypedTelemetryData> typeList = processParamsMap.get(name);
deviceId = generateDeviceId(dt.getWorkstationId(),name);
- System.out.println("deivcdid="+deviceId);
- iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
+ //System.out.println("deivcdid="+deviceId);
+ iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
Tablet tablet = new Tablet(deviceId, schemas);
for(TypedTelemetryData tdata : typeList) {
+
rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, tdata.getTime());
tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
tablet.addValue("n",rowIndex,tdata.getName());
- tablet.addValue("v",rowIndex,tdata.getValue());
+ if(!tdata.getName().equals("Alarm")) {
+ tablet.addValue("v",rowIndex,tdata.getValue());
+ }else {
+ //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+ tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
+ }
}
try {
iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+ //updateLastParam(dt.getWorkstationId(),typeList);
} catch (Exception e) {
log.error("IOTDB鍏ュ簱澶辫触",e);
- e.printStackTrace();
- }
- //System.out.println(typeList);
- }
- /*
- for (TelemetryDataItem dataItem : dt.getDataItems()) {
-
- for (Map<String, String> point : dataItem.getDataPoints()) {
-
- String[] keys = point.keySet().toArray(new String[0]);
- for(int i=0;i<keys.length;i++) {
- rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, dataItem.getTime());
- tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId()));
- propertyName = keys[i];
- deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName;
- tablet.addValue("n",rowIndex,propertyName);
- tablet.addValue("v",rowIndex,point.get(propertyName));
- }
-
- }
-
- try {
- iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
- } catch (Exception e) {
- log.error("IOTDB鍏ュ簱澶辫触",e);
- e.printStackTrace();
+ }finally {
+ //iotdbConfig.getSessionPool().clo1se();
}
}
- */
+
}
+ /**
+ * 灏嗘姤璀︿俊鎭牸寮忓寲鏈猨son瀵硅薄(鍘熺増鏄痡sonobj鎴栬�卝son鏁扮粍)锛屾牸寮忎负 {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+ * @param collectAlarmValue
+ * @return
+ */
+ String formatAlarmMsg(long time,String collectAlarmValue){
+ JSONObject alarmObj = new JSONObject();
+ alarmObj.put("timestamp", time);
+ alarmObj.put("code", "00");
+ alarmObj.put("msg", collectAlarmValue);
+ alarmObj.put("alarmtype", 0);
+ alarmObj.put("level", "");
+ return alarmObj.toJSONString();
+ }
+
+ /**
+ * 濉厖鏈�鏂板弬鏁扮殑鏁版嵁
+ * @param typeList
+ * @throws StatementExecutionException
+ * @throws IoTDBConnectionException
+ */
+ void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
+ if(typeList.isEmpty()){
+ return;
+ }
+ long updateTime = typeList.get(0).getTime();
+ List<MeasurementSchema> schemas = new ArrayList<>();
+
+ schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
+
+ Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
+
+
+ String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
+
+ try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
+ if(dsw.hasNext()) {
+ RowRecord rec = dsw.next();
+ long time = rec.getTimestamp();
+
+ String paramJsonStr = rec.getFields().get(2).getStringValue();
+
+ tablet.rowSize = 1;
+ tablet.addTimestamp(0, time);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+ JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
+ for(TypedTelemetryData tdata: typeList) {
+
+ if(paramObj.containsKey(tdata.getName())) {
+ JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+ }else {
+ JSONObject itemObj = new JSONObject();
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+
+ }
+ }
+ tablet.addValue("param_json", 0, paramObj.toJSONString());
+
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+
+ }else {
+ //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
+ tablet.rowSize = 1;
+
+ tablet.addTimestamp(0, updateTime);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+
+ JSONObject paramObj = new JSONObject();
+ for(TypedTelemetryData tdata: typeList) {
+ JSONObject itemObj = new JSONObject();
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+ }
+
+ tablet.addValue("param_json", 0,paramObj.toJSONString());
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+ }
+ }
+ //dsw.close();
+ }
+
+ /**
+ * 瑙f瀽鏁版嵁锛屽舰鎴恘ame - value瀵圭被鍨嬬殑鏁版嵁鍒楄〃锛屽苟鐢╪ame杩涜鍒嗙粍
+ * @param dt
+ * @return 鎸塶ame鍒嗙粍鍚庣殑鏁版嵁
+ */
Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
List<TypedTelemetryData> list = new ArrayList<>();
@@ -166,7 +289,7 @@
}
String generateDeviceId(long workstationId,String propertyName) {
- return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
+ return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
}
}
--
Gitblit v1.9.3