From a33c33d48c2c16995130b825355b6883be4eb159 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期一, 02 九月 2024 15:26:41 +0800
Subject: [PATCH] 加入定时任务,启动时打固定点 ,并且加入工位数据缓存
---
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 128 +++++++++++++++++++++++++++++++-----------
1 files changed, 93 insertions(+), 35 deletions(-)
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index f62c473..5967f24 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,13 +4,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Random;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -18,10 +19,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
-import com.qianwen.mdc.collect.entity.iotdb.CollectData;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
/**
@@ -37,8 +38,6 @@
private IotDBSessionConfig iotdbConfig;
@Autowired
private IotDBCommonService iotDBCommonService;
- @Autowired
- private RedisUtil redisUtil;
private static String TEMPLATE_NAME = "process_param";
@@ -52,7 +51,7 @@
PROCESS_PARAM_MAP.put(7, "ALARM");
}
- // TelemetryDataMessage telemetryDataMessage
+
public void handleCollectData(List<TelemetryData> telemetryDataList) {
for (TelemetryData dt : telemetryDataList) {
@@ -90,8 +89,8 @@
List<TypedTelemetryData> typeList = processParamsMap.get(name);
deviceId = generateDeviceId(dt.getWorkstationId(),name);
- System.out.println("deivcdid="+deviceId);
- iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
+ //System.out.println("deivcdid="+deviceId);
+ iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
Tablet tablet = new Tablet(deviceId, schemas);
for(TypedTelemetryData tdata : typeList) {
rowIndex = tablet.rowSize++;
@@ -103,41 +102,100 @@
try {
iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
- } catch (Exception e) {
- log.error("IOTDB鍏ュ簱澶辫触",e);
- e.printStackTrace();
- }
- //System.out.println(typeList);
- }
- /*
- for (TelemetryDataItem dataItem : dt.getDataItems()) {
-
- for (Map<String, String> point : dataItem.getDataPoints()) {
-
- String[] keys = point.keySet().toArray(new String[0]);
- for(int i=0;i<keys.length;i++) {
- rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, dataItem.getTime());
- tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId()));
- propertyName = keys[i];
- deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName;
- tablet.addValue("n",rowIndex,propertyName);
- tablet.addValue("v",rowIndex,point.get(propertyName));
- }
-
- }
-
- try {
- iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+
+ updateLastParam(dt.getWorkstationId(),typeList);
} catch (Exception e) {
log.error("IOTDB鍏ュ簱澶辫触",e);
e.printStackTrace();
}
}
- */
+
}
+ /**
+ * 濉厖鏈�鏂板弬鏁扮殑鏁版嵁
+ * @param typeList
+ * @throws StatementExecutionException
+ * @throws IoTDBConnectionException
+ */
+ void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
+ if(typeList.isEmpty()){
+ return;
+ }
+ long updateTime = typeList.get(0).getTime();
+ List<MeasurementSchema> schemas = new ArrayList<>();
+
+ schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
+
+ Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
+ for(TypedTelemetryData tdata: typeList) {
+
+ }
+
+
+ String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
+ SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
+
+ if(dsw.hasNext()) {
+ RowRecord rec = dsw.next();
+ long time = rec.getTimestamp();
+
+ String paramJsonStr = rec.getFields().get(2).getStringValue();
+
+ tablet.rowSize = 1;
+ tablet.addTimestamp(0, time);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+ JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
+ for(TypedTelemetryData tdata: typeList) {
+
+ if(paramObj.containsKey(tdata.getName())) {
+ JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+ }else {
+ JSONObject itemObj = new JSONObject();
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+
+ }
+ }
+ tablet.addValue("param_json", 0, paramObj.toJSONString());
+
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+
+ }else {
+ //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
+ tablet.rowSize = 1;
+
+ tablet.addTimestamp(0, updateTime);
+ tablet.addValue("update_time", 0, updateTime);
+ tablet.addValue("workstation_id", 0, workstationId);
+
+ JSONObject paramObj = new JSONObject();
+ for(TypedTelemetryData tdata: typeList) {
+ JSONObject itemObj = new JSONObject();
+ itemObj.put("value", tdata.getValue());
+ itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+ paramObj.put(tdata.getName(), itemObj);
+ }
+
+ tablet.addValue("param_json", 0,paramObj.toJSONString());
+ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+ }
+
+ }
+
+ /**
+ * 瑙f瀽鏁版嵁锛屽舰鎴恘ame - value瀵圭被鍨嬬殑鏁版嵁鍒楄〃锛屽苟鐢╪ame杩涜鍒嗙粍
+ * @param dt
+ * @return 鎸塶ame鍒嗙粍鍚庣殑鏁版嵁
+ */
Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
List<TypedTelemetryData> list = new ArrayList<>();
--
Gitblit v1.9.3