yangys
2024-10-08 ba9557b5edf7f31ab8bce0ae57aaaaefd6459bb4
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -20,11 +20,15 @@
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import cn.hutool.json.JSONUtil;
/**
 * 采集数据处理入库
@@ -39,6 +43,13 @@
   private IotDBSessionConfig iotdbConfig;
   @Autowired
   private IotDBCommonService iotDBCommonService;
   @Autowired
   private MqttMessageSender mqttMessageSender;
   /**
    * 实时数据topic,要与mdc里面得相同
    */
   public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
   
   private static String TEMPLATE_NAME = "process_param";
@@ -57,19 +68,56 @@
      for (TelemetryData dt : telemetryDataList) {
         handleOneWorkstation(dt);
         sendRealtimeDataMsg(dt);
      }
   }
   void sendRealtimeDataMsg(TelemetryData dt) {
      if(ObjectUtil.isEmpty(dt.getDataItems())){
         return;
      }
      List<TypedTelemetryData> dataList= new ArrayList<>();
      String propertyName;
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
              String[] keys = point.keySet().toArray(new String[0]);
              for(int i=0;i<keys.length;i++) {
                 TypedTelemetryData tpData = new TypedTelemetryData();
                 propertyName = keys[i];
                 tpData.setTime(dataItem.getTime());
                 tpData.setName(propertyName);
                 tpData.setValue(point.get(propertyName));
                 dataList.add(tpData);
              }
         }
      }
      //发送mqtt消息,通知mdc消息来了
      for(TypedTelemetryData item : dataList) {
         JSONObject json = new JSONObject();
         json.put("workstationId",dt.getWorkstationId());
         json.put("name", item.getName());
         json.put("value", item.getValue());
         json.put("time", item.getTime());
         mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
      }
   }
   /**
    * 处理一个工位的数据解析入库
    * @param dt
    */
   void handleOneWorkstation(TelemetryData dt) {
      String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
      // 挂载模板
      //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
      List<MeasurementSchema> schemas = new ArrayList<>();
      
@@ -109,10 +157,12 @@
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            
            updateLastParam(dt.getWorkstationId(),typeList);
            //updateLastParam(dt.getWorkstationId(),typeList);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }finally {
            //iotdbConfig.getSessionPool().clo1se();
         }
      }
      
@@ -152,64 +202,62 @@
      schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
      
      Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
      for(TypedTelemetryData tdata: typeList) {
      }
      
      String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
      SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
      
      if(dsw.hasNext()) {
         RowRecord rec = dsw.next();
         long time = rec.getTimestamp();
         String paramJsonStr = rec.getFields().get(2).getStringValue();
         tablet.rowSize = 1;
         tablet.addTimestamp(0, time);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
         for(TypedTelemetryData tdata: typeList) {
      try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
         if(dsw.hasNext()) {
            RowRecord rec = dsw.next();
            long time = rec.getTimestamp();
            
            if(paramObj.containsKey(tdata.getName())) {
               JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }else {
            String paramJsonStr = rec.getFields().get(2).getStringValue();
            tablet.rowSize = 1;
            tablet.addTimestamp(0, time);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
            for(TypedTelemetryData tdata: typeList) {
               if(paramObj.containsKey(tdata.getName())) {
                  JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }else {
                  JSONObject itemObj = new JSONObject();
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }
            }
            tablet.addValue("param_json", 0, paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }else {
            //没数据,新加入一条
            tablet.rowSize = 1;
            tablet.addTimestamp(0, updateTime);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = new JSONObject();
            for(TypedTelemetryData tdata: typeList) {
               JSONObject itemObj = new JSONObject();
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }
            tablet.addValue("param_json", 0,paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }
         tablet.addValue("param_json", 0, paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }else {
         //没数据,新加入一条
         tablet.rowSize = 1;
         tablet.addTimestamp(0, updateTime);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = new JSONObject();
         for(TypedTelemetryData tdata: typeList) {
            JSONObject itemObj = new JSONObject();
            itemObj.put("value", tdata.getValue());
            itemObj.put("time", tdata.getTime());//采集时间
            paramObj.put(tdata.getName(), itemObj);
         }
         tablet.addValue("param_json", 0,paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }
      //dsw.close();
   }
   
   /**