From fca5c28c79b061f4db3658f6d9e043024f326962 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期五, 14 十一月 2025 21:58:02 +0800
Subject: [PATCH] 离线默认配置改为300秒
---
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 191 ++++++++++++++++++++++++++++++++---------------
1 files changed, 128 insertions(+), 63 deletions(-)
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index 5967f24..73fe491 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,7 +4,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.stream.Collectors;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
@@ -20,10 +19,13 @@
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
+import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
-import com.qianwen.mdc.collect.utils.redis.RedisUtil;
+import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
* 閲囬泦鏁版嵁澶勭悊鍏ュ簱
@@ -32,12 +34,19 @@
public class CollectDataService {
private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
- private String DB_PREFIX = "root.f2.";
private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
@Autowired
private IotDBSessionConfig iotdbConfig;
@Autowired
private IotDBCommonService iotDBCommonService;
+ @Autowired
+ private MqttMessageSender mqttMessageSender;
+
+
+ /**
+ * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚�
+ */
+ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
private static String TEMPLATE_NAME = "process_param";
@@ -55,32 +64,69 @@
public void handleCollectData(List<TelemetryData> telemetryDataList) {
for (TelemetryData dt : telemetryDataList) {
+
handleOneWorkstation(dt);
+
+ sendRealtimeDataMsg(dt);
}
}
+ void sendRealtimeDataMsg(TelemetryData dt) {
+ if(ObjectUtil.isEmpty(dt.getDataItems())){
+ return;
+ }
+
+ List<TypedTelemetryData> dataList= new ArrayList<>();
+ String propertyName;
+ for (TelemetryDataItem dataItem : dt.getDataItems()) {
+
+ for (Map<String, String> point : dataItem.getDataPoints()) {
+
+ String[] keys = point.keySet().toArray(new String[0]);
+ for(int i=0;i<keys.length;i++) {
+ TypedTelemetryData tpData = new TypedTelemetryData();
+ propertyName = keys[i];
+ tpData.setTime(dataItem.getTime());
+ tpData.setName(propertyName);
+ tpData.setValue(point.get(propertyName));
+
+ dataList.add(tpData);
+ }
+
+ }
+
+ }
+
+ //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡
+ for(TypedTelemetryData item : dataList) {
+ JSONObject json = new JSONObject();
+ json.put("workstationId",dt.getWorkstationId());
+ json.put("name", item.getName());
+ json.put("value", item.getValue());
+ json.put("time", item.getTime());
+
+ mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
+ }
+
+ }
/**
* 澶勭悊涓�涓伐浣嶇殑鏁版嵁瑙f瀽鍏ュ簱
* @param dt
*/
void handleOneWorkstation(TelemetryData dt) {
- String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
-
- // 鎸傝浇妯℃澘
- //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
-
- List<MeasurementSchema> schemas = new ArrayList<>();
+ String deviceId;
+ //long workstationId = dpVo.getWorkstationId();
+
+ List<MeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
-
int rowIndex = 0;
Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
-
String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
String name;
@@ -93,24 +139,45 @@
iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
Tablet tablet = new Tablet(deviceId, schemas);
for(TypedTelemetryData tdata : typeList) {
+
rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, tdata.getTime());
tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
tablet.addValue("n",rowIndex,tdata.getName());
- tablet.addValue("v",rowIndex,tdata.getValue());
+ if(!tdata.getName().equals("Alarm")) {
+ tablet.addValue("v",rowIndex,tdata.getValue());
+ }else {
+ //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+ tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
+ }
}
try {
iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-
- updateLastParam(dt.getWorkstationId(),typeList);
+ //updateLastParam(dt.getWorkstationId(),typeList);
} catch (Exception e) {
log.error("IOTDB鍏ュ簱澶辫触",e);
- e.printStackTrace();
+ }finally {
+ //iotdbConfig.getSessionPool().clo1se();
}
}
+ }
+
+ /**
+ * 灏嗘姤璀︿俊鎭牸寮忓寲鏈猨son瀵硅薄(鍘熺増鏄痡sonobj鎴栬�卝son鏁扮粍)锛屾牸寮忎负 {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+ * @param collectAlarmValue
+ * @return
+ */
+ String formatAlarmMsg(long time,String collectAlarmValue){
+ JSONObject alarmObj = new JSONObject();
+ alarmObj.put("timestamp", time);
+ alarmObj.put("code", "00");
+ alarmObj.put("msg", collectAlarmValue);
+ alarmObj.put("alarmtype", 0);
+ alarmObj.put("level", "");
+ return alarmObj.toJSONString();
}
/**
@@ -131,64 +198,62 @@
schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
- for(TypedTelemetryData tdata: typeList) {
-
- }
-
+
String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
- SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
- if(dsw.hasNext()) {
- RowRecord rec = dsw.next();
- long time = rec.getTimestamp();
-
- String paramJsonStr = rec.getFields().get(2).getStringValue();
-
- tablet.rowSize = 1;
- tablet.addTimestamp(0, time);
- tablet.addValue("update_time", 0, updateTime);
- tablet.addValue("workstation_id", 0, workstationId);
- JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
- for(TypedTelemetryData tdata: typeList) {
+ try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
+ if(dsw.hasNext()) {
+ RowRecord rec = dsw.next();
+ long time = rec.getTimestamp();
- if(paramObj.containsKey(tdata.getName())) {
- JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
- itemObj.put("value", tdata.getValue());
- itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
- paramObj.put(tdata.getName(), itemObj);
- }else {
+ String paramJsonStr = rec.getFields().get(2).getStringValue();
+
+ tablet.rowSize = 1;
+ tablet.addTimestamp(0, time);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+ JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
+ for(TypedTelemetryData tdata: typeList) {
+
+ if(paramObj.containsKey(tdata.getName())) {
+ JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+ }else {
+ JSONObject itemObj = new JSONObject();
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+
+ }
+ }
+ tablet.addValue("param_json", 0, paramObj.toJSONString());
+
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+
+ }else {
+ //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
+ tablet.rowSize = 1;
+
+ tablet.addTimestamp(0, updateTime);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+
+ JSONObject paramObj = new JSONObject();
+ for(TypedTelemetryData tdata: typeList) {
JSONObject itemObj = new JSONObject();
itemObj.put("value", tdata.getValue());
itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
paramObj.put(tdata.getName(), itemObj);
-
}
+
+ tablet.addValue("param_json", 0,paramObj.toJSONString());
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
}
- tablet.addValue("param_json", 0, paramObj.toJSONString());
-
- this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-
- }else {
- //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
- tablet.rowSize = 1;
-
- tablet.addTimestamp(0, updateTime);
- tablet.addValue("update_time", 0, updateTime);
- tablet.addValue("workstation_id", 0, workstationId);
-
- JSONObject paramObj = new JSONObject();
- for(TypedTelemetryData tdata: typeList) {
- JSONObject itemObj = new JSONObject();
- itemObj.put("value", tdata.getValue());
- itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
- paramObj.put(tdata.getName(), itemObj);
- }
-
- tablet.addValue("param_json", 0,paramObj.toJSONString());
- this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
}
-
+ //dsw.close();
}
/**
@@ -224,7 +289,7 @@
}
String generateDeviceId(long workstationId,String propertyName) {
- return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
+ return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
}
}
--
Gitblit v1.9.3