yangys
2025-11-24 11d4be720620abf502d35000e2ed40d30c4023bf
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -1,31 +1,35 @@
package com.qianwen.mdc.collect.service;
import java.time.DayOfWeek;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.temporal.WeekFields;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.google.common.collect.Sets;
import com.qianwen.mdc.collect.cache.TimeSliceCache;
import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.constants.CommonConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.dto.CacheBuildDTO;
import com.qianwen.mdc.collect.dto.CalendarShiftTimeSlicesDTO;
import com.qianwen.mdc.collect.dto.PackedTelemetryData;
import com.qianwen.mdc.collect.handler.PackedTelemetryDataHandlerSelector;
import com.qianwen.mdc.collect.handler.TelemetryDataHandler;
import com.qianwen.mdc.collect.utils.LocalDateTimeUtils;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
/**
 * 采集数据填充,聚合和入库
@@ -34,16 +38,15 @@
public class PackedDataService {
   private static final Logger log = LoggerFactory.getLogger(PackedDataService.class);
   private String DB_PREFIX = "root.f2.";
   private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
   @Autowired
   private IotDBSessionConfig iotdbConfig;
   @Autowired
   private IotDBCommonService iotDBCommonService;
   @Autowired
   private PackedTelemetryDataHandlerSelector handlerSelector;
   
   private static String TEMPLATE_NAME = "process_param";
   @Autowired
   private WorkstationCache workstationCache;
   @Autowired
   private TimeSliceCache timeSliceCache;
   static {
      PROCESS_PARAM_MAP.put(1, "STATE");
@@ -58,7 +61,7 @@
   public void handle(List<TelemetryData> telemetryDataList) {
      List<PackedTelemetryData> pdataList = packetData(telemetryDataList);
      System.out.print(pdataList);
      //System.out.print(pdataList);
      for (PackedTelemetryData pdt : pdataList) {
         TelemetryDataHandler handler = handlerSelector.select(pdt.getParamType());
          if (handler == null) {
@@ -70,7 +73,7 @@
   }
   /**
    * 数据填充,(calendarCode,factoryYear,month,date,week)
    * 数据填充,日历代码之类的统计相关字段初始化(calendarCode,factoryYear,month,date,week)
    * 
    * @param telemetryDataList
    * @return
@@ -87,12 +90,13 @@
               String[] keys = map.keySet().toArray(new String[0]);
               for (int j = 0; j < keys.length; j++) {
                  PackedTelemetryData pkData = new PackedTelemetryData();
                  pkData.setWorkstationId(tdata.getWorkstationId());
                  pkData.setValue(map.get(keys[j]));
                  pkData.setTime(item.getTime());
                  pkData.setName(keys[j]);//参数名称
                  pkData.setName(keys[j]);//数据点名称
                        
                  fillByCalendar(pkData);
@@ -111,19 +115,56 @@
    * 
    * @param pdata
    */
   void fillByCalendar(PackedTelemetryData pdata) {
   public void fillByCalendar(PackedTelemetryData pdata) {
      //原来在TelemetryPropertyWrapper.packWorkstationCalendarInfo中完成
      //FIXME: 填写实际的数据还需要后续处理
      pdata.setCalendarCode("1");//临时,目前库内都是1
      pdata.setShiftIndex(1);//临时
      pdata.setShiftTimeType(1);//临时
      Date collectTime = new Date(pdata.getTime());
      String calendarCode = workstationCache.getWorkstationCalendarCodeForDate(pdata.getWorkstationId(), DateUtil.formatDate(DateUtil.date(collectTime)));
      if (ObjectUtil.isNotEmpty(calendarCode)) {
         pdata.setCalendarCode(calendarCode);
            //telemetryData.setCalendarCode(calendarCode);
        } else {
            //telemetryData.setCalendarCode("#default#");
            pdata.setCalendarCode(CommonConstant.DEFAULT_CODE);
        }
      
      // TelemetryPropertyWrapper中packWorkstationCalendarInfo
      Date collectTime = new Date(pdata.getTime());
      //------------start
      boolean isDefaultCalendar = true;
        if (!CommonConstant.DEFAULT_CODE.equals(pdata.getCalendarCode())) {
            CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(pdata.getCalendarCode(), collectTime);//从redis中获得日历的时间切片
            if (ObjectUtil.isEmpty(calendarShiftTimeSlicesDTO)) {//如果没有时间切片,则使用TimeSliceCache.build(cacheBuildDTO);构建
                LocalDate targetDate = Instant.ofEpochMilli(pdata.getTime()).atZone(ZoneOffset.systemDefault()).toLocalDate();
                CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).calendarCode(calendarCode).targetDate(targetDate).build();
                timeSliceCache.build(cacheBuildDTO);
                calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(pdata.getCalendarCode(), collectTime);
            }
            if (ObjectUtil.isNotEmpty(calendarShiftTimeSlicesDTO)) {
               pdata.setShiftIndex(calendarShiftTimeSlicesDTO.getShiftIndex()).setShiftTimeType(Integer.valueOf(calendarShiftTimeSlicesDTO.getShiftTimeType())).setFactoryDate(Integer.valueOf(calendarShiftTimeSlicesDTO.getFactoryDate().replaceAll("-", ""))).setFactoryWeek(calendarShiftTimeSlicesDTO.getFactoryWeek()).setFactoryMonth(calendarShiftTimeSlicesDTO.getFactoryMonth()).setFactoryYear(calendarShiftTimeSlicesDTO.getFactoryYear());
                isDefaultCalendar = false;
            }
        }
        //无日历切片,使用采集时间填充factoryYear,month,date,week几个属性
        if (isDefaultCalendar) {
            log.info("无日历切片");
            LocalDate localDate = Instant.ofEpochMilli(pdata.getTime().longValue()).atZone(ZoneOffset.systemDefault()).toLocalDate();
            pdata.setFactoryDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(collectTime)))
            .setFactoryYear(DateUtil.year(collectTime))
            .setFactoryWeek(LocalDateTimeUtils.getWeek(localDate))
            .setFactoryMonth(DateUtil.month(collectTime) + 1)
            .setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX)
            .setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE);
        }
        //----------------------end
      /*
      LocalDate localDate = Instant.ofEpochMilli(pdata.getTime()).atZone(ZoneOffset.systemDefault()).toLocalDate();
      String PURE_DATE_PATTERN = "yyyyMMdd";
      /** 标准日期格式 {@link FastDateFormat}:yyyyMMdd */
      FastDateFormat PURE_DATE_FORMAT = FastDateFormat.getInstance(PURE_DATE_PATTERN);
      WeekFields weekFields = WeekFields.of(DayOfWeek.MONDAY, 1);
@@ -131,44 +172,7 @@
      pdata.setFactoryYear(localDate.getYear()).setFactoryMonth(localDate.getMonthValue())
            .setFactoryDate(Integer.valueOf(PURE_DATE_FORMAT.format(collectTime)))
            .setFactoryWeek(localDate.get(weekFields.weekOfYear()));
      */
   }
   void handleOneWorkstationXX(TelemetryData dt) {
      String deviceId = DB_PREFIX + TEMPLATE_NAME + "_" + dt.getWorkstationId();
      // 挂载模板
      iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
      List<MeasurementSchema> schemas = new ArrayList<>();
      schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
      schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
      schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
      Tablet tablet = new Tablet(deviceId, schemas);
      int rowIndex = 0;
      for (TelemetryDataItem dataItem : dt.getDataItems()) {
         for (Map<String, String> point : dataItem.getDataPoints()) {
            String[] keys = point.keySet().toArray(new String[0]);
            for (int i = 0; i < keys.length; i++) {
               rowIndex = tablet.rowSize++;
               tablet.addTimestamp(rowIndex, dataItem.getTime());
               tablet.addValue("workstation_id", rowIndex, new Long(dt.getWorkstationId()));
               tablet.addValue("n", rowIndex, keys[i]);
               tablet.addValue("v", rowIndex, point.get(keys[i]));
            }
         }
         try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
         } catch (Exception e) {
            log.error("IODDB入库失败", e);
            e.printStackTrace();
         }
      }
   }
}