package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.Collections;
|
import java.util.Comparator;
|
import java.util.List;
|
import java.util.stream.Collectors;
|
|
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
import org.apache.iotdb.tsfile.write.record.Tablet;
|
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
|
import com.qianwen.mdc.collect.constants.IOTDBConstant;
|
import com.qianwen.mdc.collect.entity.iotdb.AggregateOutput;
|
import com.qianwen.mdc.collect.entity.iotdb.Output;
|
import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper;
|
|
@DS("iotdb")
|
@Service
|
public class OutputAggregateService{
|
private static final Logger log = LoggerFactory.getLogger(OutputAggregateService.class);
|
@Autowired
|
private IotDBCommonService iotDBCommonService;
|
@Autowired
|
private IotDBSessionConfig iotdbCfg;
|
@Autowired
|
private OutputMapper outputMapper;
|
|
|
|
//@RedisLock(value = "lock:posting:output", param = "#workStationAggregateMessage.workStationId", leaseTime = 240)
|
//public void outputAggregate(WorkstationAggregateMessage workStationAggregateMessage) {
|
public void outputAggregate(Long workstationId) {
|
try {
|
String pathPprefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId;
|
//this.workstationAggregateOutputMapper.createTable(workStationId);
|
//this.workstationOutputMapper.createTable(workStationId);
|
iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT, pathPprefix);
|
/*
|
List<WorkstationOutput> notSyncWorkstationOutputs = this.workstationOutputMapper.selectList(Wrappers.<WorkstationOutput>lambdaQuery()
|
.eq(WorkstationOutput::getWorkstationId, workStationId)
|
.eq(WorkstationOutput::getIsSync, Boolean.FALSE)
|
.orderByAsc(WorkstationOutput::getTs));
|
WorkstationOutput syncOutput = this.workstationOutputMapper.selectOne(Wrappers.<WorkstationOutput>lambdaQuery()
|
.eq(WorkstationOutput::getWorkstationId, workStationId)
|
.eq(WorkstationOutput::getIsSync, Boolean.TRUE));
|
List<WorkstationAggregateOutput> result = buildAggregateList(notSyncWorkstationOutputs, syncOutput);
|
handlerAggregateOutput(notSyncWorkstationOutputs, result, workStationId);
|
*/
|
} catch (Exception e) {
|
log.error("【工位:{} 产量数据同步发生异常,错误信息{}】", workstationId, e);
|
}
|
}
|
|
|
public void outputRealTimeAggregate(Output output) {
|
//单条数据聚合
|
try {
|
Long workstationId = output.getWorkstationId();
|
|
crateOutputTables(workstationId);
|
|
List<Output> notSyncWorkstationOutputs = outputMapper.notSyncWorkstationOutputs(workstationId);
|
|
notSyncWorkstationOutputs.add(output);
|
Output syncOutput = this.outputMapper.getLastIsSyncOutput(workstationId);
|
|
List<AggregateOutput> result = buildAggregateList(notSyncWorkstationOutputs, syncOutput);
|
|
handlerAggregateOutput(notSyncWorkstationOutputs, result, workstationId);
|
|
} catch (Exception e) {
|
log.error("【工位:{} 产量数据实时同步发生异常,错误信息{}】", output.getWorkstationId(), e);
|
}
|
}
|
|
/**
|
* 以workstationId为后缀,建立产量、产量聚合表(如果不存在),表名:output_{workstationId},aggregate_output_{workstationId}
|
* @param workstationId
|
*/
|
void crateOutputTables(long workstationId){
|
String aggregatePathPrefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT+"_"+workstationId;
|
iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT, aggregatePathPrefix);
|
|
String outputPathPrefix = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId;
|
iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_OUTPUT, outputPathPrefix);
|
|
}
|
|
|
private List<AggregateOutput> buildAggregateList(List<Output> notSyncOutputs, Output syncOutput) {
|
|
if (notSyncOutputs.isEmpty()) {
|
return Collections.emptyList();
|
}
|
List<AggregateOutput> aggregateOutputList = new ArrayList<>();
|
notSyncOutputs = notSyncOutputs.stream().sorted(Comparator.comparing(Output::getTime)).collect(Collectors.toList());
|
Output prevOutput = syncOutput;
|
for (int i = 0; i < notSyncOutputs.size(); i++) {
|
Output currentOutput = notSyncOutputs.get(i);
|
if (prevOutput != null) {
|
//Long output = Long.valueOf(WorkstationParamTypeEnum.PULSE_OUTPUT.getType().equals(currentOutput.getParamType()) ? 1L : currentOutput.getValueCollect().longValue() - prevOutput.getValueCollect().longValue());
|
|
Long output = currentOutput.getValueCollect() - prevOutput.getValueCollect();
|
|
AggregateOutput aggOut = new AggregateOutput();
|
aggOut.setWorkstationId(currentOutput.getWorkstationId());
|
aggOut.setTime(currentOutput.getTime());
|
aggOut.setPreTime(prevOutput.getTime());
|
aggOut.setOutput(output);
|
|
aggOut.setCurOutput(currentOutput.getValueCollect());
|
aggOut.setPreOutput(prevOutput.getValueCollect());
|
aggOut.setShiftIndex(currentOutput.getShiftIndex());
|
aggOut.setShiftTimeType(currentOutput.getShiftTimeType());
|
aggOut.setCalendarCode(currentOutput.getCalendarCode());
|
aggOut.setFactoryYear(currentOutput.getFactoryYear());
|
aggOut.setFactoryMonth(currentOutput.getFactoryMonth());
|
aggOut.setFactoryWeek(currentOutput.getFactoryWeek());
|
aggOut.setFactoryDate(currentOutput.getFactoryDate());
|
aggOut.setEmployeeId(currentOutput.getEmployeeId());
|
|
aggregateOutputList.add(aggOut);
|
//workstationAggregateOutputList.add(AggregateOutput.builder().workstationId(currentOutput.getWorkstationId()).ts(currentOutput.getTs()).preTs(prevOutput.getTs()).program(currentOutput.getProgram()).productCode(currentOutput.getProductCode()).productName(currentOutput.getProductName()).output(output).curOutput(currentOutput.getValueCollect()).preOutput(prevOutput.getValueCollect()).shiftIndex(currentOutput.getShiftIndex()).shiftTimeType(currentOutput.getShiftTimeType()).calendarCode(currentOutput.getCalendarCode()).factoryDate(currentOutput.getFactoryDate()).factoryMonth(currentOutput.getFactoryMonth()).factoryWeek(currentOutput.getFactoryWeek()).factoryYear(currentOutput.getFactoryYear()).employeeId(currentOutput.getEmployeeId()).build());
|
}
|
prevOutput = currentOutput;
|
}
|
|
return aggregateOutputList;
|
}
|
|
|
private void handlerAggregateOutput(List<Output> notSyncWorkstationOutputs, List<AggregateOutput> result, Long workStationId) {
|
if (!result.isEmpty()) {
|
batchInsertAggregateOutput(result,workStationId);
|
}
|
if (!notSyncWorkstationOutputs.isEmpty()) {
|
notSyncWorkstationOutputs.forEach(x -> {
|
x.setIsSync(true);
|
});
|
/*
|
List<Output> workstationOutputList = notSyncWorkstationOutputs.stream().map(x -> {
|
x.setIsSync(true);
|
return x;
|
}).collect(Collectors.toList());*/
|
batchInsertOutput(notSyncWorkstationOutputs, workStationId);
|
}
|
}
|
|
private void batchInsertAggregateOutput(List<AggregateOutput> list, Long workstationId) {
|
|
List<MeasurementSchema> schemas = new ArrayList<>();
|
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("output", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("cur_output", TSDataType.INT64));
|
|
schemas.add(new MeasurementSchema("pre_output", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("pre_time", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT));
|
schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64));
|
|
String deviceId = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_AGGREGATEOUTPUT+"_"+workstationId;
|
Tablet tablet = new Tablet(deviceId, schemas);
|
|
int rowIndex = 0;
|
for(AggregateOutput aggOut : list) {
|
rowIndex = tablet.rowSize++;
|
|
tablet.addTimestamp(rowIndex, aggOut.getTime());
|
tablet.addValue("workstation_id", rowIndex, aggOut.getWorkstationId());
|
tablet.addValue("output", rowIndex, aggOut.getOutput());
|
tablet.addValue("cur_output", rowIndex, aggOut.getCurOutput());
|
tablet.addValue("pre_output", rowIndex, aggOut.getPreOutput());
|
|
tablet.addValue("pre_time", rowIndex, aggOut.getPreTime());
|
tablet.addValue("calendar_code", rowIndex, aggOut.getCalendarCode());
|
tablet.addValue("factory_year", rowIndex, aggOut.getFactoryYear());
|
tablet.addValue("factory_month", rowIndex, aggOut.getFactoryMonth());
|
tablet.addValue("factory_week", rowIndex, aggOut.getFactoryWeek());
|
tablet.addValue("factory_date", rowIndex, aggOut.getFactoryDate());
|
|
tablet.addValue("shift_index", rowIndex, aggOut.getShiftIndex());
|
tablet.addValue("shift_time_type", rowIndex, aggOut.getShiftTimeType());
|
tablet.addValue("employee_id", rowIndex, aggOut.getEmployeeId());
|
}
|
|
try {
|
iotdbCfg.getSessionPool().insertAlignedTablet(tablet);
|
} catch (Exception e) {
|
log.error("聚合产量IODDB入库失败", e);
|
}
|
}
|
|
private void batchInsertOutput(List<Output> list, Long workstationId) {
|
|
if(list.isEmpty()) {
|
return;
|
}
|
|
List<MeasurementSchema> schemas = new ArrayList<>();
|
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("value_collect", TSDataType.INT64));
|
|
schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT));
|
schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN));
|
schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64));
|
|
String deviceId = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_OUTPUT+"_"+workstationId;
|
Tablet tablet = new Tablet(deviceId, schemas);
|
int rowIndex = 0;
|
for(Output output : list) {
|
rowIndex = tablet.rowSize++;
|
|
tablet.addTimestamp(rowIndex, output.getTime());
|
tablet.addValue("workstation_id", rowIndex, output.getWorkstationId());
|
tablet.addValue("value_collect", rowIndex, output.getValueCollect());
|
|
tablet.addValue("calendar_code", rowIndex, output.getCalendarCode());
|
tablet.addValue("factory_year", rowIndex, output.getFactoryYear());
|
tablet.addValue("factory_month", rowIndex, output.getFactoryMonth());
|
tablet.addValue("factory_week", rowIndex, output.getFactoryWeek());
|
tablet.addValue("factory_date", rowIndex, output.getFactoryDate());
|
|
tablet.addValue("shift_index", rowIndex, output.getShiftIndex());
|
tablet.addValue("shift_time_type", rowIndex, output.getShiftTimeType());
|
tablet.addValue("is_sync", rowIndex, output.getIsSync());
|
tablet.addValue("employee_id", rowIndex, output.getEmployeeId());
|
}
|
|
try {
|
iotdbCfg.getSessionPool().insertAlignedTablet(tablet);
|
} catch (Exception e) {
|
log.error("产量IODDB入库失败", e);
|
}
|
|
}
|
}
|