package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map.Entry;
|
import java.util.stream.Collectors;
|
|
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
import org.apache.iotdb.tsfile.write.record.Tablet;
|
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.qianwen.core.tool.utils.Func;
|
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
|
import com.qianwen.mdc.collect.constants.IOTDBConstant;
|
import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO;
|
import com.qianwen.mdc.collect.entity.iotdb.AggregateState;
|
import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
|
import com.qianwen.mdc.collect.mapper.iotdb.AggregateStateMapper;
|
import com.qianwen.mdc.collect.mapper.iotdb.DeviceStateMapper;
|
import com.qianwen.mdc.collect.utils.WorkstationStateAggregateNoFeedbackUtils;
|
|
/**
|
* 非反馈点 状态聚合服务
|
*/
|
@Service
|
public class DeviceStateAggregateNoFeedbackService {
|
private static final Logger log = LoggerFactory.getLogger(DeviceStateAggregateNoFeedbackService.class);
|
@Autowired
|
private DeviceStateMapper stateMapper;
|
@Autowired
|
private AggregateStateMapper workstationAggregateStateMapper;
|
@Autowired
|
private IotDBSessionConfig iotdbConfig;
|
@Autowired
|
private IotDBCommonService iotDBCommonService;
|
|
private static final int MAX_COUNT = 1000;
|
|
public List<AggregateState> stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List<DeviceState> effectiveStateList) {
|
//按timeRange查询时间区间内的状态数据,除了已删除的,其他数据都查出来了
|
/*
|
List<DeviceState> notSyncWorkstationStates = this.stateMapper.selectList(Wrappers.<DeviceState>lambdaQuery()
|
.eq(DeviceState::getWorkstationId, workstationId).and(wrapper -> {
|
LambdaQueryWrapper lambdaQueryWrapper = wrapper.eq(DeviceState::getIsDeleted, Boolean.FALSE).or().isNull(DeviceState::getIsDeleted);
|
}).ge(Func.isNotEmpty(timeRange.getStartTime()), DeviceState::getTime, timeRange.getStartTime())
|
.le(DeviceState::getTime, timeRange.getEndTime()).orderByAsc(DeviceState::getTime));
|
*/
|
List<DeviceState> notSyncWorkstationStates = stateMapper.statesInTimeRange(workstationId,timeRange);
|
effectiveStateList.addAll(notSyncWorkstationStates);//干嘛的?返回给调用者了?
|
List<AggregateState> result = WorkstationStateAggregateNoFeedbackUtils.buildAggregateList(notSyncWorkstationStates);
|
return result;
|
}
|
|
/**
|
* 过滤出有效聚合状态数据并保存
|
* @param result
|
* @param workstationId
|
* @param effectTimeRangeList
|
*/
|
public void handlerAggregateState(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
|
if(result.isEmpty()) {
|
return;
|
}
|
|
List<AggregateState> aggStates = getFinallyAggregateStateList(result, workstationId, timeRange);
|
|
List<MeasurementSchema> schemas = new ArrayList<>();
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("value_collect", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("end_time", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("duration_collect", TSDataType.INT64));
|
|
schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT));
|
schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("wcs", TSDataType.INT32));
|
schemas.add(new MeasurementSchema("rps", TSDataType.INT32));
|
//schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN));
|
schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO 这个属性应该是GlobalWcsOfRps中的值,如何填写?
|
|
schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN));
|
schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64));
|
|
//命名:aggregate_state_{workstation_id}
|
String deviceId = IOTDBConstant.DB_PREFIX+"aggregate_state_"+workstationId;
|
|
iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//挂载模板
|
|
Tablet tablet = new Tablet(deviceId, schemas);
|
|
AggregateState aggState;
|
int tblIndex = -1;
|
|
for(int i=0;i<aggStates.size();i++) {
|
aggState = aggStates.get(i);
|
tblIndex = tablet.rowSize++;
|
tablet.addTimestamp(tblIndex, aggState.getTime());
|
tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId());
|
tablet.addValue("value_collect", tblIndex, aggState.getValueCollect());
|
tablet.addValue("end_time", tblIndex, aggState.getEndTime());
|
tablet.addValue("duration_collect", tblIndex, aggState.getDurationCollect());
|
tablet.addValue("calendar_code", tblIndex, aggState.getCalendarCode());
|
tablet.addValue("factory_year", tblIndex, aggState.getFactoryYear());
|
tablet.addValue("factory_month", tblIndex, aggState.getFactoryMonth());
|
tablet.addValue("factory_week", tblIndex, aggState.getFactoryWeek());
|
tablet.addValue("factory_date", tblIndex, aggState.getFactoryDate());
|
tablet.addValue("shift_index", tblIndex, aggState.getShiftIndex());
|
tablet.addValue("shift_time_type", tblIndex, aggState.getShiftTimeType());
|
tablet.addValue("wcs", tblIndex, aggState.getWcs());
|
tablet.addValue("rps", tblIndex, aggState.getRps());
|
|
tablet.addValue("is_plan", tblIndex, aggState.getIsPlan());
|
tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted());
|
tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId());
|
|
if(aggState.getWorkstationId() == 1656819337286631426L) {
|
System.out.println("laile"+aggState.getWorkstationId());
|
}
|
//tblIndex++;
|
if(tblIndex >= MAX_COUNT) {
|
try {
|
//每个工位批量插入一次数据
|
this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
log.info("保存聚合状态完成tblIndex={}",tblIndex);
|
//tablet.rowSize = 0;
|
tablet.reset();
|
tblIndex = 0;
|
} catch (Exception e) {
|
log.error("保存固定点数据异常",e);
|
}
|
}
|
|
}
|
|
if(tablet.rowSize > 0) {
|
try {
|
//每个工位批量插入一次数据
|
this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
log.info("保存聚合状态完成finaltblIndex={}",tblIndex);
|
tablet.reset();
|
tblIndex = -1;
|
} catch (Exception e) {
|
log.error("保存固定点数据异常",e);
|
}
|
|
}
|
|
|
}
|
|
|
private List<AggregateState> getFinallyAggregateStateList(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
|
/*
|
List<StateAggregateTimeDTO> effectTimeRangeList2 = effectTimeRangeList.stream().filter(x -> {
|
return Boolean.TRUE.equals(x.getNoFeedbackTime());
|
}).collect(Collectors.toList());
|
*/
|
List<AggregateState> finallyResult = new ArrayList<>();
|
//for (StateAggregateTimeDTO item : effectTimeRangeList2) {
|
/*
|
List<AggregateState> temp = this.workstationAggregateStateMapper.selectList(Wrappers.<AggregateState>lambdaQuery()
|
.eq(AggregateState::getWorkstationId, workstationId).and(wrapper -> {
|
LambdaQueryWrapper lambdaQueryWrapper = wrapper.eq(AggregateState::getIsDeleted, Boolean.FALSE).or().isNull(AggregateState::getIsDeleted);
|
}).
|
ge(Func.isNotEmpty(timeRange.getStartTime()), AggregateState::getTime, timeRange.getStartTime()).le(AggregateState::getTime, timeRange.getEndTime()).orderByAsc(AggregateState::getTime));
|
*/
|
List<AggregateState> temp = this.workstationAggregateStateMapper.aggregateStatesInTimeRange(workstationId, timeRange);
|
if (Func.isNotEmpty(temp)) {
|
finallyResult.addAll(temp);
|
}
|
//}
|
|
List<AggregateState> deleteData = finallyResult.stream().map(x2 -> {
|
x2.setIsDeleted(Boolean.TRUE);
|
return x2;
|
}).collect(Collectors.toList());
|
return WorkstationStateAggregateNoFeedbackUtils.getReallyEffectWorkstationStateAggregateNoFeedbackList(result, deleteData);
|
}
|
}
|