From ba9557b5edf7f31ab8bce0ae57aaaaefd6459bb4 Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期二, 08 十月 2024 14:06:54 +0800 Subject: [PATCH] 处理iot链接超时问题 --- collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 150 +++++++++++++++++++++++++++++++++----------------- 1 files changed, 99 insertions(+), 51 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index 8c2d04b..f219f26 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -20,11 +20,15 @@ import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; +import com.qianwen.core.tool.utils.ObjectUtil; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; +import com.qianwen.mdc.collect.mqtt.MqttMessageSender; import com.qianwen.mdc.collect.utils.redis.RedisUtil; + +import cn.hutool.json.JSONUtil; /** * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 @@ -39,6 +43,13 @@ private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; + @Autowired + private MqttMessageSender mqttMessageSender; + + /** + * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚� + */ + public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; private static String TEMPLATE_NAME = "process_param"; @@ -57,19 +68,56 @@ for (TelemetryData dt : telemetryDataList) { handleOneWorkstation(dt); + + sendRealtimeDataMsg(dt); } } + void sendRealtimeDataMsg(TelemetryData dt) { + if(ObjectUtil.isEmpty(dt.getDataItems())){ + return; + } + + List<TypedTelemetryData> dataList= new ArrayList<>(); + String propertyName; + for (TelemetryDataItem dataItem : dt.getDataItems()) { + + for (Map<String, String> point : dataItem.getDataPoints()) { + + String[] keys = point.keySet().toArray(new String[0]); + for(int i=0;i<keys.length;i++) { + TypedTelemetryData tpData = new TypedTelemetryData(); + propertyName = keys[i]; + tpData.setTime(dataItem.getTime()); + tpData.setName(propertyName); + tpData.setValue(point.get(propertyName)); + + dataList.add(tpData); + } + + } + + } + + //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡 + for(TypedTelemetryData item : dataList) { + JSONObject json = new JSONObject(); + json.put("workstationId",dt.getWorkstationId()); + json.put("name", item.getName()); + json.put("value", item.getValue()); + json.put("time", item.getTime()); + + mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); + } + + } /** * 澶勭悊涓�涓伐浣嶇殑鏁版嵁瑙f瀽鍏ュ簱 * @param dt */ void handleOneWorkstation(TelemetryData dt) { String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); - - // 鎸傝浇妯℃澘 - //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId); List<MeasurementSchema> schemas = new ArrayList<>(); @@ -109,10 +157,12 @@ try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - updateLastParam(dt.getWorkstationId(),typeList); + //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB鍏ュ簱澶辫触",e); e.printStackTrace(); + }finally { + //iotdbConfig.getSessionPool().clo1se(); } } @@ -152,64 +202,62 @@ schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); Tablet tablet = new Tablet("root.f2.last_process_param", schemas); - for(TypedTelemetryData tdata: typeList) { - - } - + String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; - SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql); - if(dsw.hasNext()) { - RowRecord rec = dsw.next(); - long time = rec.getTimestamp(); - - String paramJsonStr = rec.getFields().get(2).getStringValue(); - - tablet.rowSize = 1; - tablet.addTimestamp(0, time); - tablet.addValue("update_time", 0, updateTime); - tablet.addValue("workstation_id", 0, workstationId); - JSONObject paramObj = JSONObject.parseObject(paramJsonStr); - for(TypedTelemetryData tdata: typeList) { + try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){ + if(dsw.hasNext()) { + RowRecord rec = dsw.next(); + long time = rec.getTimestamp(); - if(paramObj.containsKey(tdata.getName())) { - JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); - itemObj.put("value", tdata.getValue()); - itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 - paramObj.put(tdata.getName(), itemObj); - }else { + String paramJsonStr = rec.getFields().get(2).getStringValue(); + + tablet.rowSize = 1; + tablet.addTimestamp(0, time); + tablet.addValue("update_time", 0, updateTime); + tablet.addValue("workstation_id", 0, workstationId); + JSONObject paramObj = JSONObject.parseObject(paramJsonStr); + for(TypedTelemetryData tdata: typeList) { + + if(paramObj.containsKey(tdata.getName())) { + JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); + itemObj.put("value", tdata.getValue()); + itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 + paramObj.put(tdata.getName(), itemObj); + }else { + JSONObject itemObj = new JSONObject(); + itemObj.put("value", tdata.getValue()); + itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 + paramObj.put(tdata.getName(), itemObj); + + } + } + tablet.addValue("param_json", 0, paramObj.toJSONString()); + + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + + }else { + //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉� + tablet.rowSize = 1; + + tablet.addTimestamp(0, updateTime); + tablet.addValue("update_time", 0, updateTime); + tablet.addValue("workstation_id", 0, workstationId); + + JSONObject paramObj = new JSONObject(); + for(TypedTelemetryData tdata: typeList) { JSONObject itemObj = new JSONObject(); itemObj.put("value", tdata.getValue()); itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 paramObj.put(tdata.getName(), itemObj); - } + + tablet.addValue("param_json", 0,paramObj.toJSONString()); + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); } - tablet.addValue("param_json", 0, paramObj.toJSONString()); - - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - - }else { - //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉� - tablet.rowSize = 1; - - tablet.addTimestamp(0, updateTime); - tablet.addValue("update_time", 0, updateTime); - tablet.addValue("workstation_id", 0, workstationId); - - JSONObject paramObj = new JSONObject(); - for(TypedTelemetryData tdata: typeList) { - JSONObject itemObj = new JSONObject(); - itemObj.put("value", tdata.getValue()); - itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 - paramObj.put(tdata.getName(), itemObj); - } - - tablet.addValue("param_json", 0,paramObj.toJSONString()); - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); } - + //dsw.close(); } /** -- Gitblit v1.9.3