| | |
| | | package com.qianwen.mdc.collect.service; |
| | | |
| | | import java.time.DayOfWeek; |
| | | import java.time.Instant; |
| | | import java.time.LocalDate; |
| | | import java.time.ZoneOffset; |
| | | import java.time.temporal.WeekFields; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import org.apache.commons.lang3.time.FastDateFormat; |
| | | import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| | | import org.apache.iotdb.tsfile.write.record.Tablet; |
| | | import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import com.qianwen.mdc.collect.config.IotDBSessionConfig; |
| | | import com.google.common.collect.Sets; |
| | | import com.qianwen.mdc.collect.cache.TimeSliceCache; |
| | | import com.qianwen.mdc.collect.cache.WorkstationCache; |
| | | import com.qianwen.mdc.collect.constants.CommonConstant; |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.dto.CacheBuildDTO; |
| | | import com.qianwen.mdc.collect.dto.CalendarShiftTimeSlicesDTO; |
| | | import com.qianwen.mdc.collect.dto.PackedTelemetryData; |
| | | import com.qianwen.mdc.collect.handler.PackedTelemetryDataHandlerSelector; |
| | | import com.qianwen.mdc.collect.handler.TelemetryDataHandler; |
| | | import com.qianwen.mdc.collect.utils.LocalDateTimeUtils; |
| | | |
| | | import cn.hutool.core.date.DatePattern; |
| | | import cn.hutool.core.date.DateUtil; |
| | | import cn.hutool.core.util.ObjectUtil; |
| | | |
| | | /** |
| | | * 采集数据填充,聚合和入库 |
| | |
| | | public class PackedDataService { |
| | | private static final Logger log = LoggerFactory.getLogger(PackedDataService.class); |
| | | |
| | | private String DB_PREFIX = "root.f2."; |
| | | private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); |
| | | @Autowired |
| | | private IotDBSessionConfig iotdbConfig; |
| | | @Autowired |
| | | private IotDBCommonService iotDBCommonService; |
| | | |
| | | @Autowired |
| | | private PackedTelemetryDataHandlerSelector handlerSelector; |
| | | |
| | | private static String TEMPLATE_NAME = "process_param"; |
| | | @Autowired |
| | | private WorkstationCache workstationCache; |
| | | @Autowired |
| | | private TimeSliceCache timeSliceCache; |
| | | |
| | | static { |
| | | PROCESS_PARAM_MAP.put(1, "STATE"); |
| | |
| | | public void handle(List<TelemetryData> telemetryDataList) { |
| | | |
| | | List<PackedTelemetryData> pdataList = packetData(telemetryDataList); |
| | | System.out.print(pdataList); |
| | | //System.out.print(pdataList); |
| | | for (PackedTelemetryData pdt : pdataList) { |
| | | TelemetryDataHandler handler = handlerSelector.select(pdt.getParamType()); |
| | | if (handler == null) { |
| | |
| | | } |
| | | |
| | | /** |
| | | * 数据填充,(calendarCode,factoryYear,month,date,week) |
| | | * 数据填充,日历代码之类的统计相关字段初始化(calendarCode,factoryYear,month,date,week) |
| | | * |
| | | * @param telemetryDataList |
| | | * @return |
| | |
| | | |
| | | String[] keys = map.keySet().toArray(new String[0]); |
| | | for (int j = 0; j < keys.length; j++) { |
| | | |
| | | PackedTelemetryData pkData = new PackedTelemetryData(); |
| | | pkData.setWorkstationId(tdata.getWorkstationId()); |
| | | pkData.setValue(map.get(keys[j])); |
| | | pkData.setTime(item.getTime()); |
| | | |
| | | pkData.setName(keys[j]);//参数名称 |
| | | pkData.setName(keys[j]);//数据点名称 |
| | | |
| | | fillByCalendar(pkData); |
| | | |
| | |
| | | * |
| | | * @param pdata |
| | | */ |
| | | void fillByCalendar(PackedTelemetryData pdata) { |
| | | public void fillByCalendar(PackedTelemetryData pdata) { |
| | | //原来在TelemetryPropertyWrapper.packWorkstationCalendarInfo中完成 |
| | | //FIXME: 填写实际的数据还需要后续处理 |
| | | pdata.setCalendarCode("1");//临时,目前库内都是1 |
| | | pdata.setShiftIndex(1);//临时 |
| | | pdata.setShiftTimeType(1);//临时 |
| | | |
| | | Date collectTime = new Date(pdata.getTime()); |
| | | String calendarCode = workstationCache.getWorkstationCalendarCodeForDate(pdata.getWorkstationId(), DateUtil.formatDate(DateUtil.date(collectTime))); |
| | | if (ObjectUtil.isNotEmpty(calendarCode)) { |
| | | pdata.setCalendarCode(calendarCode); |
| | | //telemetryData.setCalendarCode(calendarCode); |
| | | } else { |
| | | //telemetryData.setCalendarCode("#default#"); |
| | | pdata.setCalendarCode(CommonConstant.DEFAULT_CODE); |
| | | } |
| | | |
| | | // TelemetryPropertyWrapper中packWorkstationCalendarInfo |
| | | Date collectTime = new Date(pdata.getTime()); |
| | | //------------start |
| | | boolean isDefaultCalendar = true; |
| | | if (!CommonConstant.DEFAULT_CODE.equals(pdata.getCalendarCode())) { |
| | | CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(pdata.getCalendarCode(), collectTime);//从redis中获得日历的时间切片 |
| | | if (ObjectUtil.isEmpty(calendarShiftTimeSlicesDTO)) {//如果没有时间切片,则使用TimeSliceCache.build(cacheBuildDTO);构建 |
| | | LocalDate targetDate = Instant.ofEpochMilli(pdata.getTime()).atZone(ZoneOffset.systemDefault()).toLocalDate(); |
| | | CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).calendarCode(calendarCode).targetDate(targetDate).build(); |
| | | timeSliceCache.build(cacheBuildDTO); |
| | | calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(pdata.getCalendarCode(), collectTime); |
| | | } |
| | | if (ObjectUtil.isNotEmpty(calendarShiftTimeSlicesDTO)) { |
| | | pdata.setShiftIndex(calendarShiftTimeSlicesDTO.getShiftIndex()).setShiftTimeType(Integer.valueOf(calendarShiftTimeSlicesDTO.getShiftTimeType())).setFactoryDate(Integer.valueOf(calendarShiftTimeSlicesDTO.getFactoryDate().replaceAll("-", ""))).setFactoryWeek(calendarShiftTimeSlicesDTO.getFactoryWeek()).setFactoryMonth(calendarShiftTimeSlicesDTO.getFactoryMonth()).setFactoryYear(calendarShiftTimeSlicesDTO.getFactoryYear()); |
| | | isDefaultCalendar = false; |
| | | } |
| | | } |
| | | |
| | | //无日历切片,使用采集时间填充factoryYear,month,date,week几个属性 |
| | | if (isDefaultCalendar) { |
| | | log.info("无日历切片"); |
| | | LocalDate localDate = Instant.ofEpochMilli(pdata.getTime().longValue()).atZone(ZoneOffset.systemDefault()).toLocalDate(); |
| | | pdata.setFactoryDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(collectTime))) |
| | | .setFactoryYear(DateUtil.year(collectTime)) |
| | | .setFactoryWeek(LocalDateTimeUtils.getWeek(localDate)) |
| | | .setFactoryMonth(DateUtil.month(collectTime) + 1) |
| | | .setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX) |
| | | .setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE); |
| | | } |
| | | |
| | | //----------------------end |
| | | /* |
| | | |
| | | LocalDate localDate = Instant.ofEpochMilli(pdata.getTime()).atZone(ZoneOffset.systemDefault()).toLocalDate(); |
| | | |
| | | String PURE_DATE_PATTERN = "yyyyMMdd"; |
| | | /** 标准日期格式 {@link FastDateFormat}:yyyyMMdd */ |
| | | |
| | | FastDateFormat PURE_DATE_FORMAT = FastDateFormat.getInstance(PURE_DATE_PATTERN); |
| | | |
| | | WeekFields weekFields = WeekFields.of(DayOfWeek.MONDAY, 1); |
| | |
| | | pdata.setFactoryYear(localDate.getYear()).setFactoryMonth(localDate.getMonthValue()) |
| | | .setFactoryDate(Integer.valueOf(PURE_DATE_FORMAT.format(collectTime))) |
| | | .setFactoryWeek(localDate.get(weekFields.weekOfYear())); |
| | | */ |
| | | } |
| | | |
| | | void handleOneWorkstationXX(TelemetryData dt) { |
| | | String deviceId = DB_PREFIX + TEMPLATE_NAME + "_" + dt.getWorkstationId(); |
| | | |
| | | // 挂载模板 |
| | | iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | |
| | | schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); |
| | | schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); |
| | | schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); |
| | | |
| | | Tablet tablet = new Tablet(deviceId, schemas); |
| | | int rowIndex = 0; |
| | | for (TelemetryDataItem dataItem : dt.getDataItems()) { |
| | | |
| | | for (Map<String, String> point : dataItem.getDataPoints()) { |
| | | |
| | | String[] keys = point.keySet().toArray(new String[0]); |
| | | for (int i = 0; i < keys.length; i++) { |
| | | rowIndex = tablet.rowSize++; |
| | | tablet.addTimestamp(rowIndex, dataItem.getTime()); |
| | | tablet.addValue("workstation_id", rowIndex, new Long(dt.getWorkstationId())); |
| | | tablet.addValue("n", rowIndex, keys[i]); |
| | | tablet.addValue("v", rowIndex, point.get(keys[i])); |
| | | } |
| | | |
| | | } |
| | | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } catch (Exception e) { |
| | | log.error("IODDB入库失败", e); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |