package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.stream.Collectors; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.qianwen.core.tool.utils.Func; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO; import com.qianwen.mdc.collect.entity.iotdb.AggregateState; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; import com.qianwen.mdc.collect.mapper.iotdb.AggregateStateMapper; import com.qianwen.mdc.collect.mapper.iotdb.DeviceStateMapper; import com.qianwen.mdc.collect.utils.WorkstationStateAggregateNoFeedbackUtils; /** * 非反馈点 状态聚合服务 */ @Service public class DeviceStateAggregateNoFeedbackService { private static final Logger log = LoggerFactory.getLogger(DeviceStateAggregateNoFeedbackService.class); @Autowired private DeviceStateMapper stateMapper; @Autowired private AggregateStateMapper workstationAggregateStateMapper; @Autowired private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; public List stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List effectiveStateList) { //按timeRange查询时间区间内的状态数据,除了已删除的,其他数据都查出来了 /* List notSyncWorkstationStates = this.stateMapper.selectList(Wrappers.lambdaQuery() .eq(DeviceState::getWorkstationId, workstationId).and(wrapper -> { LambdaQueryWrapper lambdaQueryWrapper = wrapper.eq(DeviceState::getIsDeleted, Boolean.FALSE).or().isNull(DeviceState::getIsDeleted); }).ge(Func.isNotEmpty(timeRange.getStartTime()), DeviceState::getTime, timeRange.getStartTime()) .le(DeviceState::getTime, timeRange.getEndTime()).orderByAsc(DeviceState::getTime)); */ List notSyncWorkstationStates = stateMapper.statesInTimeRange(workstationId,timeRange); effectiveStateList.addAll(notSyncWorkstationStates);//干嘛的?返回给调用者了? List result = WorkstationStateAggregateNoFeedbackUtils.buildAggregateList(notSyncWorkstationStates); return result; } /** * 过滤出有效聚合状态数据并保存 * @param result * @param workstationId * @param effectTimeRangeList */ public void handlerAggregateState(List result, Long workstationId, StateAggregateTimeDTO timeRange) { /* if (Func.isNotEmpty(result)) { Map stringListMap = CommonUtil.groupList(getFinallyAggregateStateList(result, workstationId, effectTimeRangeList), CommonConstant.MAX_RECORDS_FOR_SQL_LENGTH.intValue()); stringListMap.forEach(k, v -> { this.workstationAggregateStateMapper.batchSave(workstationId, v); }); }*/ if(result.isEmpty()) { return; } List aggStates = getFinallyAggregateStateList(result, workstationId, timeRange); List schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("value_collect", TSDataType.INT32)); schemas.add(new MeasurementSchema("end_time", TSDataType.INT64)); schemas.add(new MeasurementSchema("duration_collect", TSDataType.INT64)); schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT)); schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32)); schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32)); schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32)); schemas.add(new MeasurementSchema("wcs", TSDataType.INT32)); schemas.add(new MeasurementSchema("rps", TSDataType.INT32)); //schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO 这个属性应该是GlobalWcsOfRps中的值,如何填写? schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64)); //命名:aggregate_state_{workstation_id} String deviceId = IOTDBConstant.DB_PREFIX+"aggregate_state_"+workstationId; iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//挂载模板 Tablet tablet = new Tablet(deviceId, schemas); tablet.rowSize = aggStates.size(); AggregateState aggState; for(int i=0;i getFinallyAggregateStateList(List result, Long workstationId, StateAggregateTimeDTO timeRange) { /* List effectTimeRangeList2 = effectTimeRangeList.stream().filter(x -> { return Boolean.TRUE.equals(x.getNoFeedbackTime()); }).collect(Collectors.toList()); */ List finallyResult = new ArrayList<>(); //for (StateAggregateTimeDTO item : effectTimeRangeList2) { /* List temp = this.workstationAggregateStateMapper.selectList(Wrappers.lambdaQuery() .eq(AggregateState::getWorkstationId, workstationId).and(wrapper -> { LambdaQueryWrapper lambdaQueryWrapper = wrapper.eq(AggregateState::getIsDeleted, Boolean.FALSE).or().isNull(AggregateState::getIsDeleted); }). ge(Func.isNotEmpty(timeRange.getStartTime()), AggregateState::getTime, timeRange.getStartTime()).le(AggregateState::getTime, timeRange.getEndTime()).orderByAsc(AggregateState::getTime)); */ List temp = this.workstationAggregateStateMapper.aggregateStatesInTimeRange(workstationId, timeRange); if (Func.isNotEmpty(temp)) { finallyResult.addAll(temp); } //} List deleteData = finallyResult.stream().map(x2 -> { x2.setIsDeleted(Boolean.TRUE); return x2; }).collect(Collectors.toList()); return WorkstationStateAggregateNoFeedbackUtils.getReallyEffectWorkstationStateAggregateNoFeedbackList(result, deleteData); } }