From 859d6321b1b1c606de09e9b6a6286aaeace638fe Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 16 十月 2024 21:03:52 +0800 Subject: [PATCH] 接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃 --- collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 18 ++++++++++-------- 1 files changed, 10 insertions(+), 8 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index 6e86e38..f599308 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -25,6 +25,7 @@ import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; /** * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 @@ -33,7 +34,6 @@ public class CollectDataService { private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); - //private String DB_PREFIX = "root.f2."; private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); @Autowired private IotDBSessionConfig iotdbConfig; @@ -41,6 +41,7 @@ private IotDBCommonService iotDBCommonService; @Autowired private MqttMessageSender mqttMessageSender; + /** * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚� @@ -63,6 +64,7 @@ public void handleCollectData(List<TelemetryData> telemetryDataList) { for (TelemetryData dt : telemetryDataList) { + handleOneWorkstation(dt); sendRealtimeDataMsg(dt); @@ -113,14 +115,14 @@ * @param dt */ void handleOneWorkstation(TelemetryData dt) { - String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); - - List<MeasurementSchema> schemas = new ArrayList<>(); + String deviceId; + //long workstationId = dpVo.getWorkstationId(); + + List<MeasurementSchema> schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); - int rowIndex = 0; @@ -137,6 +139,8 @@ iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); Tablet tablet = new Tablet(deviceId, schemas); for(TypedTelemetryData tdata : typeList) { + + rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, tdata.getTime()); tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); @@ -144,18 +148,16 @@ if(!tdata.getName().equals("Alarm")) { tablet.addValue("v",rowIndex,tdata.getValue()); }else { - //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡鏈猨son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} + //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue())); } } try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB鍏ュ簱澶辫触",e); - e.printStackTrace(); }finally { //iotdbConfig.getSessionPool().clo1se(); } -- Gitblit v1.9.3