yangys
2025-11-24 11d4be720620abf502d35000e2ed40d30c4023bf
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,13 +4,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -18,11 +18,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.entity.iotdb.CollectData;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
 * 采集数据处理入库
@@ -31,14 +34,19 @@
public class CollectDataService {
   private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
   
   private String DB_PREFIX = "root.f2.";
   private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
   @Autowired
   private IotDBSessionConfig iotdbConfig;
   @Autowired
   private IotDBCommonService iotDBCommonService;
   @Autowired
   private  RedisUtil redisUtil;
   private MqttMessageSender mqttMessageSender;
   /**
    * 实时数据topic,要与mdc里面得相同
    */
   public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
   
   private static String TEMPLATE_NAME = "process_param";
@@ -52,36 +60,73 @@
      PROCESS_PARAM_MAP.put(7, "ALARM");
   }
   // TelemetryDataMessage telemetryDataMessage
   public void handleCollectData(List<TelemetryData> telemetryDataList) {
      for (TelemetryData dt : telemetryDataList) {
         handleOneWorkstation(dt);
         sendRealtimeDataMsg(dt);
      }
   }
   void sendRealtimeDataMsg(TelemetryData dt) {
      if(ObjectUtil.isEmpty(dt.getDataItems())){
         return;
      }
      List<TypedTelemetryData> dataList= new ArrayList<>();
      String propertyName;
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
              String[] keys = point.keySet().toArray(new String[0]);
              for(int i=0;i<keys.length;i++) {
                 TypedTelemetryData tpData = new TypedTelemetryData();
                 propertyName = keys[i];
                 tpData.setTime(dataItem.getTime());
                 tpData.setName(propertyName);
                 tpData.setValue(point.get(propertyName));
                 dataList.add(tpData);
              }
         }
      }
      //发送mqtt消息,通知mdc消息来了
      for(TypedTelemetryData item : dataList) {
         JSONObject json = new JSONObject();
         json.put("workstationId",dt.getWorkstationId());
         json.put("name", item.getName());
         json.put("value", item.getValue());
         json.put("time", item.getTime());
         mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
      }
   }
   /**
    * 处理一个工位的数据解析入库
    * @param dt
    */
   void handleOneWorkstation(TelemetryData dt) {
      String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
      // 挂载模板
      //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
      List<MeasurementSchema> schemas = new ArrayList<>();
      String deviceId;
      
      //long workstationId = dpVo.getWorkstationId();
      List<MeasurementSchema> schemas = new ArrayList<>();
      schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
      schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
      schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
      
      int rowIndex = 0;
      
      Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
      
      String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
      String name;
@@ -90,54 +135,132 @@
         List<TypedTelemetryData> typeList = processParamsMap.get(name);
         
         deviceId = generateDeviceId(dt.getWorkstationId(),name);
         System.out.println("deivcdid="+deviceId);
         iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId);
         //System.out.println("deivcdid="+deviceId);
         iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
         Tablet tablet = new Tablet(deviceId, schemas);
         for(TypedTelemetryData tdata : typeList) {
            rowIndex = tablet.rowSize++;
            tablet.addTimestamp(rowIndex, tdata.getTime());
            tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
            tablet.addValue("n",rowIndex,tdata.getName());
              tablet.addValue("v",rowIndex,tdata.getValue());
            if(!tdata.getName().equals("Alarm")) {
               tablet.addValue("v",rowIndex,tdata.getValue());
            }else {
               //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
               tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
            }
         }
         
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            //updateLastParam(dt.getWorkstationId(),typeList);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }
         //System.out.println(typeList);
      }
      /*
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
              String[] keys = point.keySet().toArray(new String[0]);
              for(int i=0;i<keys.length;i++) {
                 rowIndex = tablet.rowSize++;
                 tablet.addTimestamp(rowIndex, dataItem.getTime());
                 tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId()));
                 propertyName = keys[i];
                 deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName;
                 tablet.addValue("n",rowIndex,propertyName);
                 tablet.addValue("v",rowIndex,point.get(propertyName));
              }
         }
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         } catch (Exception e) {
            log.error("IOTDB入库失败",e);
            e.printStackTrace();
         }finally {
            //iotdbConfig.getSessionPool().clo1se();
         }
      }
      */
   }
   
   /**
    * 将报警信息格式化未json对象(原版是jsonobj或者json数组),格式为  {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
    * @param collectAlarmValue
    * @return
    */
   String formatAlarmMsg(long time,String collectAlarmValue){
      JSONObject alarmObj = new JSONObject();
      alarmObj.put("timestamp", time);
      alarmObj.put("code", "00");
      alarmObj.put("msg", collectAlarmValue);
      alarmObj.put("alarmtype", 0);
      alarmObj.put("level", "");
      return alarmObj.toJSONString();
   }
   /**
    * 填充最新参数的数据
    * @param typeList
    * @throws StatementExecutionException
    * @throws IoTDBConnectionException
    */
   void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
      if(typeList.isEmpty()){
         return;
      }
      long updateTime = typeList.get(0).getTime();
      List<MeasurementSchema> schemas = new ArrayList<>();
      schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
      schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
      schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
      Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
      String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
      try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
         if(dsw.hasNext()) {
            RowRecord rec = dsw.next();
            long time = rec.getTimestamp();
            String paramJsonStr = rec.getFields().get(2).getStringValue();
            tablet.rowSize = 1;
            tablet.addTimestamp(0, time);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
            for(TypedTelemetryData tdata: typeList) {
               if(paramObj.containsKey(tdata.getName())) {
                  JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }else {
                  JSONObject itemObj = new JSONObject();
                  itemObj.put("value", tdata.getValue());
                  itemObj.put("time", tdata.getTime());//采集时间
                  paramObj.put(tdata.getName(), itemObj);
               }
            }
            tablet.addValue("param_json", 0, paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }else {
            //没数据,新加入一条
            tablet.rowSize = 1;
            tablet.addTimestamp(0, updateTime);
            tablet.addValue("update_time", 0, updateTime);
            tablet.addValue("workstation_id", 0, workstationId);
            JSONObject paramObj = new JSONObject();
            for(TypedTelemetryData tdata: typeList) {
               JSONObject itemObj = new JSONObject();
               itemObj.put("value", tdata.getValue());
               itemObj.put("time", tdata.getTime());//采集时间
               paramObj.put(tdata.getName(), itemObj);
            }
            tablet.addValue("param_json", 0,paramObj.toJSONString());
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         }
      }
      //dsw.close();
   }
   /**
    * 解析数据,形成name - value对类型的数据列表,并用name进行分组
    * @param dt
    * @return 按name分组后的数据
    */
   Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
      List<TypedTelemetryData> list = new ArrayList<>();
      
@@ -166,7 +289,7 @@
   }
   
   String generateDeviceId(long workstationId,String propertyName) {
      return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
      return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
   }
}