yangys
2024-09-02 a33c33d48c2c16995130b825355b6883be4eb159
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,13 +4,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -18,10 +19,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.entity.iotdb.CollectData;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
/**
@@ -37,8 +38,6 @@
   private IotDBSessionConfig iotdbConfig;
   @Autowired
   private IotDBCommonService iotDBCommonService;
   @Autowired
   private  RedisUtil redisUtil;
   
   private static String TEMPLATE_NAME = "process_param";
@@ -52,7 +51,7 @@
      PROCESS_PARAM_MAP.put(7, "ALARM");
   }
   // TelemetryDataMessage telemetryDataMessage
   public void handleCollectData(List<TelemetryData> telemetryDataList) {
      for (TelemetryData dt : telemetryDataList) {
@@ -90,8 +89,8 @@
         List<TypedTelemetryData> typeList = processParamsMap.get(name);
         
         deviceId = generateDeviceId(dt.getWorkstationId(),name);
         System.out.println("deivcdid="+deviceId);
         iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
         //System.out.println("deivcdid="+deviceId);
         iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
         Tablet tablet = new Tablet(deviceId, schemas);
         for(TypedTelemetryData tdata : typeList) {
            rowIndex = tablet.rowSize++;
@@ -103,41 +102,100 @@
         
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }
         //System.out.println(typeList);
      }
      /*
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
              String[] keys = point.keySet().toArray(new String[0]);
              for(int i=0;i<keys.length;i++) {
                 rowIndex = tablet.rowSize++;
                 tablet.addTimestamp(rowIndex, dataItem.getTime());
                 tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId()));
                 propertyName = keys[i];
                 deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName;
                 tablet.addValue("n",rowIndex,propertyName);
                 tablet.addValue("v",rowIndex,point.get(propertyName));
              }
         }
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            updateLastParam(dt.getWorkstationId(),typeList);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }
      }
      */
   }
   
   /**
    * 填充最新参数的数据
    * @param typeList
    * @throws StatementExecutionException
    * @throws IoTDBConnectionException
    */
   void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
      if(typeList.isEmpty()){
         return;
      }
      long updateTime = typeList.get(0).getTime();
      List<MeasurementSchema> schemas = new ArrayList<>();
      schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
      schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
      schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
      Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
      for(TypedTelemetryData tdata: typeList) {
      }
      String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
      SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
      if(dsw.hasNext()) {
         RowRecord rec = dsw.next();
         long time = rec.getTimestamp();
         String paramJsonStr = rec.getFields().get(2).getStringValue();
         tablet.rowSize = 1;
         tablet.addTimestamp(0, time);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
         for(TypedTelemetryData tdata: typeList) {
            if(paramObj.containsKey(tdata.getName())) {
               JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }else {
               JSONObject itemObj = new JSONObject();
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }
         }
         tablet.addValue("param_json", 0, paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }else {
         //没数据,新加入一条
         tablet.rowSize = 1;
         tablet.addTimestamp(0, updateTime);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = new JSONObject();
         for(TypedTelemetryData tdata: typeList) {
            JSONObject itemObj = new JSONObject();
            itemObj.put("value", tdata.getValue());
            itemObj.put("time", tdata.getTime());//采集时间
            paramObj.put(tdata.getName(), itemObj);
         }
         tablet.addValue("param_json", 0,paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }
   }
   /**
    * 解析数据,形成name - value对类型的数据列表,并用name进行分组
    * @param dt
    * @return 按name分组后的数据
    */
   Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
      List<TypedTelemetryData> list = new ArrayList<>();