From a33c33d48c2c16995130b825355b6883be4eb159 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期一, 02 九月 2024 15:26:41 +0800
Subject: [PATCH] 加入定时任务,启动时打固定点 ,并且加入工位数据缓存

---
 collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java |  128 +++++++++++++++++++++++++++++++-----------
 1 files changed, 93 insertions(+), 35 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index f62c473..5967f24 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,13 +4,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Random;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -18,10 +19,10 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import com.alibaba.fastjson.JSONObject;
 import com.qianwen.mdc.collect.config.IotDBSessionConfig;
 import com.qianwen.mdc.collect.domain.TelemetryData;
 import com.qianwen.mdc.collect.domain.TelemetryDataItem;
-import com.qianwen.mdc.collect.entity.iotdb.CollectData;
 import com.qianwen.mdc.collect.utils.redis.RedisUtil;
 
 /**
@@ -37,8 +38,6 @@
 	private IotDBSessionConfig iotdbConfig;
 	@Autowired
 	private IotDBCommonService iotDBCommonService;
-	@Autowired
-	private  RedisUtil redisUtil;
 	
 	private static String TEMPLATE_NAME = "process_param";
 
@@ -52,7 +51,7 @@
 		PROCESS_PARAM_MAP.put(7, "ALARM");
 	}
 
-	// TelemetryDataMessage telemetryDataMessage
+	
 	public void handleCollectData(List<TelemetryData> telemetryDataList) {
 
 		for (TelemetryData dt : telemetryDataList) {
@@ -90,8 +89,8 @@
 			List<TypedTelemetryData> typeList = processParamsMap.get(name);
 			
 			deviceId = generateDeviceId(dt.getWorkstationId(),name);
-			System.out.println("deivcdid="+deviceId);
-			iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
+			//System.out.println("deivcdid="+deviceId);
+			iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
 			Tablet tablet = new Tablet(deviceId, schemas);
 			for(TypedTelemetryData tdata : typeList) {
 				rowIndex = tablet.rowSize++;
@@ -103,41 +102,100 @@
 			
 			try {
 				iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-			} catch (Exception e) {
-				log.error("IOTDB鍏ュ簱澶辫触",e);
-				e.printStackTrace();
-			}
-			//System.out.println(typeList);
-		}
-		/*
-		for (TelemetryDataItem dataItem : dt.getDataItems()) {
-
-			for (Map<String, String> point : dataItem.getDataPoints()) {
-		        
-		        String[] keys = point.keySet().toArray(new String[0]);
-		        for(int i=0;i<keys.length;i++) {
-		        	rowIndex = tablet.rowSize++;
-		        	tablet.addTimestamp(rowIndex, dataItem.getTime());
-		        	tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId()));
-		        	propertyName = keys[i];
-		        	deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName;
-		        	tablet.addValue("n",rowIndex,propertyName);
-		        	tablet.addValue("v",rowIndex,point.get(propertyName));
-		        }
-		  
-			}
-			
-			try {
-				iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+				
+				updateLastParam(dt.getWorkstationId(),typeList);
 			} catch (Exception e) {
 				log.error("IOTDB鍏ュ簱澶辫触",e);
 				e.printStackTrace();
 			}
 		}
-		*/
+		
 
 	}
 	
+	/**
+	 * 濉厖鏈�鏂板弬鏁扮殑鏁版嵁
+	 * @param typeList
+	 * @throws StatementExecutionException 
+	 * @throws IoTDBConnectionException 
+	 */
+	void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
+		if(typeList.isEmpty()){
+			return;
+		}
+		long updateTime = typeList.get(0).getTime();
+		List<MeasurementSchema> schemas = new ArrayList<>();
+		
+		schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
+		schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
+		schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
+		
+		Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
+		for(TypedTelemetryData tdata: typeList) {
+			
+		}
+	
+		
+		String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
+		SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
+		
+		if(dsw.hasNext()) {
+			RowRecord rec = dsw.next();
+			long time = rec.getTimestamp();
+			
+			String paramJsonStr = rec.getFields().get(2).getStringValue();
+			
+			tablet.rowSize = 1;
+			tablet.addTimestamp(0, time);
+			tablet.addValue("update_time", 0, updateTime);
+			tablet.addValue("workstation_id", 0, workstationId);
+			JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
+			for(TypedTelemetryData tdata: typeList) {
+				
+				if(paramObj.containsKey(tdata.getName())) {
+					JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
+					itemObj.put("value", tdata.getValue());
+					itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+					paramObj.put(tdata.getName(), itemObj);
+				}else {
+					JSONObject itemObj = new JSONObject();
+					itemObj.put("value", tdata.getValue());
+					itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+					paramObj.put(tdata.getName(), itemObj);
+					
+				}
+			}
+			tablet.addValue("param_json", 0, paramObj.toJSONString());
+			
+			this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+
+		}else {
+			//娌℃暟鎹紝鏂板姞鍏ヤ竴鏉�
+			tablet.rowSize = 1;
+			
+			tablet.addTimestamp(0, updateTime);
+			tablet.addValue("update_time", 0, updateTime);
+			tablet.addValue("workstation_id", 0, workstationId);
+			
+			JSONObject paramObj = new JSONObject();
+			for(TypedTelemetryData tdata: typeList) {
+				JSONObject itemObj = new JSONObject();
+				itemObj.put("value", tdata.getValue());
+				itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿
+				paramObj.put(tdata.getName(), itemObj);
+			}
+
+			tablet.addValue("param_json", 0,paramObj.toJSONString());
+			this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+		}
+		
+	}
+	
+	/**
+	 * 瑙f瀽鏁版嵁锛屽舰鎴恘ame - value瀵圭被鍨嬬殑鏁版嵁鍒楄〃锛屽苟鐢╪ame杩涜鍒嗙粍
+	 * @param dt
+	 * @return 鎸塶ame鍒嗙粍鍚庣殑鏁版嵁
+	 */
 	Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
 		List<TypedTelemetryData> list = new ArrayList<>();
 		

--
Gitblit v1.9.3