package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.baomidou.dynamic.datasource.annotation.DS; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.entity.iotdb.AggregateOutput; import com.qianwen.mdc.collect.entity.iotdb.Output; import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper; @DS("iotdb") @Service public class OutputAggregateService{ private static final Logger log = LoggerFactory.getLogger(OutputAggregateService.class); @Autowired private IotDBCommonService iotDBCommonService; @Autowired private IotDBSessionConfig iotdbCfg; @Autowired private OutputMapper outputMapper; //@RedisLock(value = "lock:posting:output", param = "#workStationAggregateMessage.workStationId", leaseTime = 240) //public void outputAggregate(WorkstationAggregateMessage workStationAggregateMessage) { public void outputAggregate(Long workstationId) { try { String pathPprefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId; //this.workstationAggregateOutputMapper.createTable(workStationId); //this.workstationOutputMapper.createTable(workStationId); iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT, pathPprefix); /* List notSyncWorkstationOutputs = this.workstationOutputMapper.selectList(Wrappers.lambdaQuery() .eq(WorkstationOutput::getWorkstationId, workStationId) .eq(WorkstationOutput::getIsSync, Boolean.FALSE) .orderByAsc(WorkstationOutput::getTs)); WorkstationOutput syncOutput = this.workstationOutputMapper.selectOne(Wrappers.lambdaQuery() .eq(WorkstationOutput::getWorkstationId, workStationId) .eq(WorkstationOutput::getIsSync, Boolean.TRUE)); List result = buildAggregateList(notSyncWorkstationOutputs, syncOutput); handlerAggregateOutput(notSyncWorkstationOutputs, result, workStationId); */ } catch (Exception e) { log.error("【工位:{} 产量数据同步发生异常,错误信息{}】", workstationId, e); } } public void outputRealTimeAggregate(Output output) { //单条数据聚合 try { Long workstationId = output.getWorkstationId(); crateOutputTables(workstationId); List notSyncWorkstationOutputs = outputMapper.notSyncWorkstationOutputs(workstationId); notSyncWorkstationOutputs.add(output); Output syncOutput = this.outputMapper.getLastIsSyncOutput(workstationId); List result = buildAggregateList(notSyncWorkstationOutputs, syncOutput); handlerAggregateOutput(notSyncWorkstationOutputs, result, workstationId); } catch (Exception e) { log.error("【工位:{} 产量数据实时同步发生异常,错误信息{}】", output.getWorkstationId(), e); } } /** * 以workstationId为后缀,建立产量、产量聚合表(如果不存在),表名:output_{workstationId},aggregate_output_{workstationId} * @param workstationId */ void crateOutputTables(long workstationId){ String aggregatePathPrefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT+"_"+workstationId; iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT, aggregatePathPrefix); String outputPathPrefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId; iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_OUTPUT, outputPathPrefix); } private List buildAggregateList(List notSyncOutputs, Output syncOutput) { if (notSyncOutputs.isEmpty()) { return Collections.emptyList(); } List aggregateOutputList = new ArrayList<>(); notSyncOutputs = notSyncOutputs.stream().sorted(Comparator.comparing(Output::getTime)).collect(Collectors.toList()); Output prevOutput = syncOutput; for (int i = 0; i < notSyncOutputs.size(); i++) { Output currentOutput = notSyncOutputs.get(i); if (prevOutput != null) { //Long output = Long.valueOf(WorkstationParamTypeEnum.PULSE_OUTPUT.getType().equals(currentOutput.getParamType()) ? 1L : currentOutput.getValueCollect().longValue() - prevOutput.getValueCollect().longValue()); Long output = currentOutput.getValueCollect() - prevOutput.getValueCollect(); AggregateOutput aggOut = new AggregateOutput(); aggOut.setWorkstationId(currentOutput.getWorkstationId()); aggOut.setTime(currentOutput.getTime()); aggOut.setPreTime(prevOutput.getTime()); aggOut.setOutput(output); aggOut.setCurOutput(currentOutput.getValueCollect()); aggOut.setPreOutput(prevOutput.getValueCollect()); aggOut.setShiftIndex(currentOutput.getShiftIndex()); aggOut.setShiftTimeType(currentOutput.getShiftTimeType()); aggOut.setCalendarCode(currentOutput.getCalendarCode()); aggOut.setFactoryYear(currentOutput.getFactoryYear()); aggOut.setFactoryMonth(currentOutput.getFactoryMonth()); aggOut.setFactoryWeek(currentOutput.getFactoryWeek()); aggOut.setFactoryDate(currentOutput.getFactoryDate()); aggOut.setEmployeeId(currentOutput.getEmployeeId()); aggregateOutputList.add(aggOut); //workstationAggregateOutputList.add(AggregateOutput.builder().workstationId(currentOutput.getWorkstationId()).ts(currentOutput.getTs()).preTs(prevOutput.getTs()).program(currentOutput.getProgram()).productCode(currentOutput.getProductCode()).productName(currentOutput.getProductName()).output(output).curOutput(currentOutput.getValueCollect()).preOutput(prevOutput.getValueCollect()).shiftIndex(currentOutput.getShiftIndex()).shiftTimeType(currentOutput.getShiftTimeType()).calendarCode(currentOutput.getCalendarCode()).factoryDate(currentOutput.getFactoryDate()).factoryMonth(currentOutput.getFactoryMonth()).factoryWeek(currentOutput.getFactoryWeek()).factoryYear(currentOutput.getFactoryYear()).employeeId(currentOutput.getEmployeeId()).build()); } prevOutput = currentOutput; } return aggregateOutputList; } private void handlerAggregateOutput(List notSyncWorkstationOutputs, List result, Long workStationId) { if (!result.isEmpty()) { batchInsertAggregateOutput(result,workStationId); } if (!notSyncWorkstationOutputs.isEmpty()) { notSyncWorkstationOutputs.forEach(x -> { x.setIsSync(true); }); /* List workstationOutputList = notSyncWorkstationOutputs.stream().map(x -> { x.setIsSync(true); return x; }).collect(Collectors.toList());*/ batchInsertOutput(notSyncWorkstationOutputs, workStationId); } } private void batchInsertAggregateOutput(List list, Long workstationId) { List schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("output", TSDataType.INT64)); schemas.add(new MeasurementSchema("cur_output", TSDataType.INT64)); schemas.add(new MeasurementSchema("pre_output", TSDataType.INT64)); schemas.add(new MeasurementSchema("pre_time", TSDataType.INT64)); schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT)); schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32)); schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64)); String deviceId = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT+"_"+workstationId; Tablet tablet = new Tablet(deviceId, schemas); int rowIndex = 0; for(AggregateOutput aggOut : list) { rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, aggOut.getTime()); tablet.addValue("workstation_id", rowIndex, aggOut.getWorkstationId()); tablet.addValue("output", rowIndex, aggOut.getOutput()); tablet.addValue("cur_output", rowIndex, aggOut.getCurOutput()); tablet.addValue("pre_output", rowIndex, aggOut.getPreOutput()); tablet.addValue("pre_time", rowIndex, aggOut.getPreTime()); tablet.addValue("calendar_code", rowIndex, aggOut.getCalendarCode()); tablet.addValue("factory_year", rowIndex, aggOut.getFactoryYear()); tablet.addValue("factory_month", rowIndex, aggOut.getFactoryMonth()); tablet.addValue("factory_week", rowIndex, aggOut.getFactoryWeek()); tablet.addValue("factory_date", rowIndex, aggOut.getFactoryDate()); tablet.addValue("shift_index", rowIndex, aggOut.getShiftIndex()); tablet.addValue("shift_time_type", rowIndex, aggOut.getShiftTimeType()); tablet.addValue("employee_id", rowIndex, aggOut.getEmployeeId()); } try { iotdbCfg.getSessionPool().insertAlignedTablet(tablet); } catch (Exception e) { log.error("聚合产量IODDB入库失败", e); } } private void batchInsertOutput(List list, Long workstationId) { if(list.isEmpty()) { return; } List schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("value_collect", TSDataType.INT64)); schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT)); schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32)); schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64)); String deviceId = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId; Tablet tablet = new Tablet(deviceId, schemas); int rowIndex = 0; for(Output output : list) { rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, output.getTime()); tablet.addValue("workstation_id", rowIndex, output.getWorkstationId()); tablet.addValue("value_collect", rowIndex, output.getValueCollect()); tablet.addValue("calendar_code", rowIndex, output.getCalendarCode()); tablet.addValue("factory_year", rowIndex, output.getFactoryYear()); tablet.addValue("factory_month", rowIndex, output.getFactoryMonth()); tablet.addValue("factory_week", rowIndex, output.getFactoryWeek()); tablet.addValue("factory_date", rowIndex, output.getFactoryDate()); tablet.addValue("shift_index", rowIndex, output.getShiftIndex()); tablet.addValue("shift_time_type", rowIndex, output.getShiftTimeType()); tablet.addValue("is_sync", rowIndex, output.getIsSync()); tablet.addValue("employee_id", rowIndex, output.getEmployeeId()); } try { iotdbCfg.getSessionPool().insertAlignedTablet(tablet); } catch (Exception e) { log.error("产量IODDB入库失败", e); } } }