yangys
2025-11-24 11d4be720620abf502d35000e2ed40d30c4023bf
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,7 +4,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
@@ -20,10 +19,13 @@
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
 * 采集数据处理入库
@@ -32,12 +34,19 @@
public class CollectDataService {
   private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
   
   private String DB_PREFIX = "root.f2.";
   private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
   @Autowired
   private IotDBSessionConfig iotdbConfig;
   @Autowired
   private IotDBCommonService iotDBCommonService;
   @Autowired
   private MqttMessageSender mqttMessageSender;
   /**
    * 实时数据topic,要与mdc里面得相同
    */
   public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
   
   private static String TEMPLATE_NAME = "process_param";
@@ -55,32 +64,69 @@
   public void handleCollectData(List<TelemetryData> telemetryDataList) {
      for (TelemetryData dt : telemetryDataList) {
         handleOneWorkstation(dt);
         sendRealtimeDataMsg(dt);
      }
   }
   void sendRealtimeDataMsg(TelemetryData dt) {
      if(ObjectUtil.isEmpty(dt.getDataItems())){
         return;
      }
      List<TypedTelemetryData> dataList= new ArrayList<>();
      String propertyName;
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
              String[] keys = point.keySet().toArray(new String[0]);
              for(int i=0;i<keys.length;i++) {
                 TypedTelemetryData tpData = new TypedTelemetryData();
                 propertyName = keys[i];
                 tpData.setTime(dataItem.getTime());
                 tpData.setName(propertyName);
                 tpData.setValue(point.get(propertyName));
                 dataList.add(tpData);
              }
         }
      }
      //发送mqtt消息,通知mdc消息来了
      for(TypedTelemetryData item : dataList) {
         JSONObject json = new JSONObject();
         json.put("workstationId",dt.getWorkstationId());
         json.put("name", item.getName());
         json.put("value", item.getValue());
         json.put("time", item.getTime());
         mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
      }
   }
   /**
    * 处理一个工位的数据解析入库
    * @param dt
    */
   void handleOneWorkstation(TelemetryData dt) {
      String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
      // 挂载模板
      //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
      List<MeasurementSchema> schemas = new ArrayList<>();
      String deviceId;
      
      //long workstationId = dpVo.getWorkstationId();
      List<MeasurementSchema> schemas = new ArrayList<>();
      schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
      schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
      schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
      
      int rowIndex = 0;
      
      Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
      
      String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
      String name;
@@ -93,24 +139,45 @@
         iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
         Tablet tablet = new Tablet(deviceId, schemas);
         for(TypedTelemetryData tdata : typeList) {
            rowIndex = tablet.rowSize++;
            tablet.addTimestamp(rowIndex, tdata.getTime());
            tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
            tablet.addValue("n",rowIndex,tdata.getName());
              tablet.addValue("v",rowIndex,tdata.getValue());
            if(!tdata.getName().equals("Alarm")) {
               tablet.addValue("v",rowIndex,tdata.getValue());
            }else {
               //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
               tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
            }
         }
         
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            updateLastParam(dt.getWorkstationId(),typeList);
            //updateLastParam(dt.getWorkstationId(),typeList);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }finally {
            //iotdbConfig.getSessionPool().clo1se();
         }
      }
      
   }
   /**
    * 将报警信息格式化未json对象(原版是jsonobj或者json数组),格式为  {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
    * @param collectAlarmValue
    * @return
    */
   String formatAlarmMsg(long time,String collectAlarmValue){
      JSONObject alarmObj = new JSONObject();
      alarmObj.put("timestamp", time);
      alarmObj.put("code", "00");
      alarmObj.put("msg", collectAlarmValue);
      alarmObj.put("alarmtype", 0);
      alarmObj.put("level", "");
      return alarmObj.toJSONString();
   }
   
   /**
@@ -131,64 +198,62 @@
      schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
      
      Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
      for(TypedTelemetryData tdata: typeList) {
      }
      
      String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
      SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql);
      
      if(dsw.hasNext()) {
         RowRecord rec = dsw.next();
         long time = rec.getTimestamp();
         String paramJsonStr = rec.getFields().get(2).getStringValue();
         tablet.rowSize = 1;
         tablet.addTimestamp(0, time);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
         for(TypedTelemetryData tdata: typeList) {
      try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
         if(dsw.hasNext()) {
            RowRecord rec = dsw.next();
            long time = rec.getTimestamp();
            
            if(paramObj.containsKey(tdata.getName())) {
               JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }else {
            String paramJsonStr = rec.getFields().get(2).getStringValue();
            tablet.rowSize = 1;
            tablet.addTimestamp(0, time);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
            for(TypedTelemetryData tdata: typeList) {
               if(paramObj.containsKey(tdata.getName())) {
                  JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }else {
                  JSONObject itemObj = new JSONObject();
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }
            }
            tablet.addValue("param_json", 0, paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }else {
            //没数据,新加入一条
            tablet.rowSize = 1;
            tablet.addTimestamp(0, updateTime);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = new JSONObject();
            for(TypedTelemetryData tdata: typeList) {
               JSONObject itemObj = new JSONObject();
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }
            tablet.addValue("param_json", 0,paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }
         tablet.addValue("param_json", 0, paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }else {
         //没数据,新加入一条
         tablet.rowSize = 1;
         tablet.addTimestamp(0, updateTime);
         tablet.addValue("update_time", 0, updateTime);
         tablet.addValue("workstation_id", 0, workstationId);
         JSONObject paramObj = new JSONObject();
         for(TypedTelemetryData tdata: typeList) {
            JSONObject itemObj = new JSONObject();
            itemObj.put("value", tdata.getValue());
            itemObj.put("time", tdata.getTime());//采集时间
            paramObj.put(tdata.getName(), itemObj);
         }
         tablet.addValue("param_json", 0,paramObj.toJSONString());
         this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
      }
      //dsw.close();
   }
   
   /**
@@ -224,7 +289,7 @@
   }
   
   String generateDeviceId(long workstationId,String propertyName) {
      return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
      return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
   }
}