collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
@@ -17,21 +17,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; //import com.qianwen.core.redis.cache.BladeRedis; import com.qianwen.mdc.collect.dto.WorkstationDTO; import com.qianwen.mdc.collect.entity.mgr.EmployeeOnOffWork; import com.qianwen.mdc.collect.entity.mgr.GlobalWcsOfRps; import com.qianwen.mdc.collect.entity.mgr.Workstation; import com.qianwen.mdc.collect.mapper.mgr.EmployeeOnOffWorkMapper; import com.qianwen.mdc.collect.mapper.mgr.GlobalWcsOfRpsMapper; import com.qianwen.mdc.collect.service.WorkstationService; import com.qianwen.mdc.collect.utils.redis.RedisUtil; import cn.hutool.core.util.ObjectUtil; //import com.qianwen.posting.convert.WorkstationConvert; //import com.qianwen.posting.entity.mysql.EmployeeOnOffWork; //import com.qianwen.posting.entity.mysql.Workstation; //import com.qianwen.posting.mapper.mysql.EmployeeOnOffWorkMapper; //import com.qianwen.posting.service.IWorkstationService; @Component public class WorkstationCache { @@ -49,6 +42,9 @@ private GlobalWcsOfRpsMapper globalWcsOfRpsMapper; @Autowired private WorkstationService workstationService; @Autowired private EmployeeOnOffWorkMapper employeeOnOffWorkMapper; //private static final EmployeeOnOffWorkMapper employeeOnOffWorkMapper = (EmployeeOnOffWorkMapper) SpringUtil.getBean(EmployeeOnOffWorkMapper.class); @@ -155,28 +151,41 @@ return wcsSetting; } /* public static Long getBelongToEmployeeForWorkstation(Long workstationId, Date timePoint) { Long employeeId = null; String redisKey = "posting:workstation".concat("::").concat("workstation:id:").concat(workstationId.toString() .concat(EMPLOYEE)); Set<EmployeeOnOffWork> employeeOnOffWorks = bladeRedis.sMembers(redisKey); EmployeeOnOffWork matchRecord = null; if (Func.isNotEmpty(employeeOnOffWorks)) matchRecord = employeeOnOffWorks.stream().filter(item -> (item.getOnlineTime().getTime() <= timePoint.getTime() && (item.getOfflineTime() == null || item.getOfflineTime().getTime() > timePoint.getTime()))).findFirst().orElse(null); if (Func.isNotEmpty(matchRecord)) { employeeId = matchRecord.getOnEmployeeId(); } else { List<EmployeeOnOffWork> queryResult = employeeOnOffWorkMapper.selectList(Wrappers.<EmployeeOnOffWork>lambdaQuery() .eq(EmployeeOnOffWork::getWorkstationId, workstationId) .le(EmployeeOnOffWork::getOnlineTime, timePoint) .and(wrapper -> wrapper.gt(EmployeeOnOffWork::getOfflineTime, timePoint).or().isNull(EmployeeOnOffWork::getOfflineTime))); if (Func.isNotEmpty(queryResult)) { employeeId = ((EmployeeOnOffWork)queryResult.get(0)).getOnEmployeeId(); bladeRedis.sAdd(redisKey, new Object[] { queryResult.get(0) }); bladeRedis.expire(redisKey, 259200L); } } return employeeId; }*/ /** * è·åå·¥ä½å¨æå®æ¶é´çä¸çåå·¥ * @param workstationId * @param timePoint * @return */ public Long getBelongToEmployeeForWorkstation(Long workstationId, Date timePoint) { Long employeeId = null; String redisKey = "posting:workstation".concat("::").concat("workstation:id:") .concat(workstationId.toString().concat(EMPLOYEE)); // Set<EmployeeOnOffWork> employeeOnOffWorks = bladeRedis.sMembers(redisKey); Set<EmployeeOnOffWork> employeeOnOffWorks = (Set<EmployeeOnOffWork>) redisUtil.sGet(redisKey); EmployeeOnOffWork matchEmployee = null; if (ObjectUtil.isNotEmpty(employeeOnOffWorks)) { matchEmployee = employeeOnOffWorks.stream().filter( item -> (item.getOnlineTime().getTime() <= timePoint.getTime() && (item.getOfflineTime() == null || item.getOfflineTime().getTime() > timePoint.getTime()))) .findFirst().orElse(null); } if (ObjectUtil.isNotEmpty(matchEmployee)) { employeeId = matchEmployee.getOnEmployeeId(); } else { List<EmployeeOnOffWork> queryResult = employeeOnOffWorkMapper.selectList( Wrappers.<EmployeeOnOffWork>lambdaQuery().eq(EmployeeOnOffWork::getWorkstationId, workstationId) .le(EmployeeOnOffWork::getOnlineTime, timePoint) .and(wrapper -> wrapper.gt(EmployeeOnOffWork::getOfflineTime, timePoint).or() .isNull(EmployeeOnOffWork::getOfflineTime))); if (ObjectUtil.isNotEmpty(queryResult)) { employeeId = ((EmployeeOnOffWork) queryResult.get(0)).getOnEmployeeId(); //bladeRedis.sAdd(redisKey, new Object[] { queryResult.get(0) }); //bladeRedis.expire(redisKey, 259200L); redisUtil.sSetAndTime(redisKey, 259200L, new Object[] { queryResult.get(0) }); } } return employeeId; } } collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java
@@ -1,4 +1,5 @@ package com.qianwen.mdc.collect.controller; import java.util.Arrays; import java.util.List; @@ -9,8 +10,11 @@ import com.baomidou.dynamic.datasource.annotation.DS; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO; import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; import com.qianwen.mdc.collect.entity.iotdb.Output; import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; import com.qianwen.mdc.collect.mapper.iotdb.DeviceStateMapper; import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper; import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper; @@ -28,16 +32,27 @@ @RestController public class StateController { @Autowired private DeviceStateAggregateService stateAggregateService; @GetMapping("/stateagg") public Object stateagg(Long workstationId) { stateAggregateService.stateAggregate(workstationId); return "1"; } @Autowired private DeviceStateMapper stateMapper; @GetMapping("/range") public Object range() { long wod = 1656819188967653378L; StateAggregateTimeDTO timeRange = new StateAggregateTimeDTO(); timeRange.setEndTime(System.currentTimeMillis()); List<DeviceState> slist = stateMapper.statesInTimeRange(wod, timeRange); return "2"; } @GetMapping("/stateagg") public Object stateagg(Long workstationId) { stateAggregateService.stateAggregate(workstationId); return "1"; } } collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
@@ -1,36 +1,29 @@ package com.qianwen.mdc.collect.handler; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Date; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.qianwen.mdc.collect.cache.WorkstationCache; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.dto.PackedTelemetryData; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; import com.qianwen.mdc.collect.entity.mgr.GlobalWcsOfRps; import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum; import com.qianwen.mdc.collect.service.DeviceStateService; import com.qianwen.mdc.collect.service.IotDBCommonService; @Component public class DeviceStatusDataHandler implements TelemetryDataHandler { private static final Logger log = LoggerFactory.getLogger(DeviceStatusDataHandler.class); @Autowired private WorkstationCache workstationCache; @Autowired private IotDBSessionConfig iotdbCfg; @Autowired private IotDBCommonService iotDBCommonService; //@Autowired //private IotDBSessionConfig iotdbCfg; //@Autowired //private IotDBCommonService iotDBCommonService; @Autowired private DeviceStateService deviceStateService; @Override @@ -46,7 +39,9 @@ state.setIsFixPoint(false); state.setIsSync(false); state.setIsDeleted(false); //state.setEmployeeId(WorkstationCache.getBelongToEmployeeForWorkstation(entity.getWorkstationId(), new Date(entity.getTs().longValue()))); state.setEmployeeId(workstationCache.getBelongToEmployeeForWorkstation(data.getWorkstationId(), new Date(data.getTime()))); state.setFeedbackPointType(FeedbackTimePointEnum.NO_FEED_BACK_POINT.getValue()); //WorkstationState propertyData = (WorkstationState) Objects.requireNonNull(BeanUtil.copy(entity, WorkstationState.class)); state.setValueCollect(Integer.valueOf(data.getValue())); collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/AggregateStateMapper.java
@@ -1,13 +1,24 @@ package com.qianwen.mdc.collect.mapper.iotdb; import java.util.List; import org.apache.ibatis.annotations.Param; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.mybatisplus.annotation.InterceptorIgnore; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO; import com.qianwen.mdc.collect.entity.iotdb.AggregateState; @DS("iotdb") @InterceptorIgnore(tenantLine = "true") public interface AggregateStateMapper extends BaseMapper<AggregateState> { /** * è·å¾æ¶é´åºé´å çèåç¶ææ°æ® * @param workstationId * @param timeRange * @return */ List<AggregateState> aggregateStatesInTimeRange(@Param("workstationId") Long workstationId,@Param("timeRange") StateAggregateTimeDTO timeRange); } collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/EmployeeOnOffWorkMapper.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,7 @@ package com.qianwen.mdc.collect.mapper.mgr; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.qianwen.mdc.collect.entity.mgr.EmployeeOnOffWork; public interface EmployeeOnOffWorkMapper extends BaseMapper<EmployeeOnOffWork> { } collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.utils.redis.RedisUtil; @@ -32,7 +33,7 @@ public class CollectDataService { private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); private String DB_PREFIX = "root.f2."; //private String DB_PREFIX = "root.f2."; private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); @Autowired private IotDBSessionConfig iotdbConfig; @@ -224,7 +225,7 @@ } String generateDeviceId(long workstationId,String propertyName) { return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName; return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName; } } collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
@@ -37,7 +37,8 @@ private AggregateStateMapper workstationAggregateStateMapper; @Autowired private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; public List<AggregateState> stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List<DeviceState> effectiveStateList) { //ætimeRangeæ¥è¯¢æ¶é´åºé´å çç¶ææ°æ®ï¼é¤äºå·²å é¤çï¼å ¶ä»æ°æ®é½æ¥åºæ¥äº @@ -90,7 +91,7 @@ schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32)); schemas.add(new MeasurementSchema("wcs", TSDataType.INT32)); schemas.add(new MeasurementSchema("rps", TSDataType.INT32)); schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); //schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO è¿ä¸ªå±æ§åºè¯¥æ¯GlobalWcsOfRpsä¸çå¼ï¼å¦ä½å¡«åï¼ schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN)); @@ -98,8 +99,11 @@ //å½åï¼aggregate_state_{workstation_id} String deviceId = IOTDBConstant.DB_PREFIX+"aggregate_state_"+workstationId; iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//æè½½æ¨¡æ¿ Tablet tablet = new Tablet(deviceId, schemas); //tablet.rowSize = aggStates.size(); tablet.rowSize = aggStates.size(); AggregateState aggState; for(int i=0;i<aggStates.size();i++) { @@ -118,7 +122,7 @@ tablet.addValue("shift_time_type", i, aggState.getShiftTimeType()); tablet.addValue("wcs", i, aggState.getWcs()); tablet.addValue("rps", i, aggState.getRps()); tablet.addValue("is_sync", i, aggState.getIsSync()); tablet.addValue("is_plan", i, aggState.getIsPlan()); tablet.addValue("feedback_id", i, aggState.getFeedbackId()); tablet.addValue("is_deleted", i, aggState.getIsDeleted()); @@ -127,6 +131,7 @@ } try { this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); log.info("ä¿åèåç¶æå®æ"); } catch (Exception e) { log.error("ä¿åèåç¶ææ°æ®å¼å¸¸",e); } @@ -142,11 +147,14 @@ */ List<AggregateState> finallyResult = new ArrayList<>(); //for (StateAggregateTimeDTO item : effectTimeRangeList2) { /* List<AggregateState> temp = this.workstationAggregateStateMapper.selectList(Wrappers.<AggregateState>lambdaQuery() .eq(AggregateState::getWorkstationId, workstationId).and(wrapper -> { LambdaQueryWrapper lambdaQueryWrapper = wrapper.eq(AggregateState::getIsDeleted, Boolean.FALSE).or().isNull(AggregateState::getIsDeleted); }). ge(Func.isNotEmpty(timeRange.getStartTime()), AggregateState::getTime, timeRange.getStartTime()).le(AggregateState::getTime, timeRange.getEndTime()).orderByAsc(AggregateState::getTime)); */ List<AggregateState> temp = this.workstationAggregateStateMapper.aggregateStatesInTimeRange(workstationId, timeRange); if (Func.isNotEmpty(temp)) { finallyResult.addAll(temp); } collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -62,7 +62,7 @@ public void handle(List<TelemetryData> telemetryDataList) { List<PackedTelemetryData> pdataList = packetData(telemetryDataList); System.out.print(pdataList); //System.out.print(pdataList); for (PackedTelemetryData pdt : pdataList) { TelemetryDataHandler handler = handlerSelector.select(pdt.getParamType()); if (handler == null) { @@ -91,6 +91,7 @@ String[] keys = map.keySet().toArray(new String[0]); for (int j = 0; j < keys.length; j++) { //TODO: è¿éï¼åç³»ç»è¿è¡äºè¿æ»¤ãWorkstationCollectDataServiceImpl.handlerWorkstationCollectDataé PackedTelemetryData pkData = new PackedTelemetryData(); pkData.setWorkstationId(tdata.getWorkstationId()); pkData.setValue(map.get(keys[j])); @@ -129,8 +130,8 @@ pdata.setCalendarCode("#default#"); } pdata.setShiftIndex(1);//ä¸´æ¶ pdata.setShiftTimeType(1);//ä¸´æ¶ //pdata.setShiftIndex(1);//ä¸´æ¶ //pdata.setShiftTimeType(1);//ä¸´æ¶ // TelemetryPropertyWrapperä¸packWorkstationCalendarInfo //------------start collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java
@@ -303,7 +303,7 @@ * @param key é® * @return */ public Set<Object> sGet(String key) { public Set<? extends Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/AggregateStateMapper.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,50 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.qianwen.mdc.collect.mapper.iotdb.AggregateStateMapper"> <resultMap id="BaseResultMap" type="com.qianwen.mdc.collect.entity.iotdb.AggregateState"> <result column="workstation_id" jdbcType="BIGINT" property="workstationId"/> <result column="end_time" jdbcType="BIGINT" property="endTime"/> <result column="duration_collect" jdbcType="BIGINT" property="durationCollect"/> <result column="value_collect" jdbcType="BIGINT" property="valueCollect"/> <result column="calendar_code" jdbcType="VARCHAR" property="calendarCode"/> <result column="factory_year" jdbcType="INTEGER" property="factoryYear"/> <result column="factory_month" jdbcType="INTEGER" property="factoryMonth"/> <result column="factory_week" jdbcType="INTEGER" property="factoryWeek"/> <result column="factory_date" jdbcType="INTEGER" property="factoryDate"/> <result column="shift_index" jdbcType="INTEGER" property="shiftIndex"/> <result column="shift_time_type" jdbcType="INTEGER" property="shiftTimeType"/> <result column="wcs" jdbcType="INTEGER" property="wcs"/> <result column="rps" jdbcType="INTEGER" property="rps"/> <result column="feedback_id" jdbcType="BIGINT" property="feedbackId"/> <result column="is_deleted" jdbcType="BOOLEAN" property="isDeleted"/> <result column="is_plan" jdbcType="BIGINT" property="isPlan"/> <result column="employee_id" jdbcType="BIGINT" property="employeeId"/> </resultMap> <sql id="all_columns"> workstation_id,end_time,duration_collect,value_collect,calendar_code,factory_year,factory_month,factory_date,factory_week,shift_index,shift_time_type,wcs,rps,is_fix_point,is_sync,is_plan,feedback_point_type,feedback_id,is_deleted,employee_id </sql> <select id="aggregateStatesInTimeRange" resultMap="BaseResultMap"> select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId} <where> is_deleted=false <if test="timeRange.startTime != null"> AND time >= #{timeRange.startTime} </if> AND time <= #{timeRange.endTime} </where> order by time asc align by device </select> </mapper> collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
@@ -5,7 +5,6 @@ <resultMap id="BaseResultMap" type="com.qianwen.mdc.collect.entity.iotdb.DeviceState"> <result column="workstation_id" jdbcType="BIGINT" property="workstationId"/> <result column="value_collect" jdbcType="BIGINT" property="valueCollect"/> <result column="param_type" jdbcType="INTEGER" property="paramType"/> <result column="calendar_code" jdbcType="VARCHAR" property="calendarCode"/> <result column="factory_year" jdbcType="INTEGER" property="factoryYear"/> <result column="factory_month" jdbcType="INTEGER" property="factoryMonth"/> @@ -14,10 +13,11 @@ <result column="shift_index" jdbcType="INTEGER" property="shiftIndex"/> <result column="shift_time_type" jdbcType="INTEGER" property="shiftTimeType"/> <result column="wcs" jdbcType="INTEGER" property="wcs"/> <result column="rps" jdbcType="INTEGER" property="wcs"/> <result column="rps" jdbcType="INTEGER" property="rps"/> <result column="is_sync" jdbcType="BOOLEAN" property="isSync"/> <result column="feedback_id" jdbcType="BIGINT" property="feedbackId"/> <result column="is_deleted" jdbcType="BOOLEAN" property="isDeleted"/> <result column="is_plan" jdbcType="BIGINT" property="isPlan"/> <result column="employee_id" jdbcType="BIGINT" property="employeeId"/> </resultMap> <!-- @@ -80,9 +80,9 @@ where is_sync=false and is_fix_point=false and feedback_point_type=0 and is_deleted=false order by time asc limit 1 </select> <!-- --> <select id="statesInTimeRange" resultMap="BaseResultMap"> select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId} select <include refid="all_columns" /> FROM root.f2.state_${workstationId} <where> is_deleted=false <if test="timeRange.startTime != null"> @@ -92,8 +92,23 @@ </where> order by time asc align by device </select> <!-- åªè½ç¨as åresultTypeçå½¢å¼ï¼ï¼ï¼ resultType="com.qianwen.mdc.collect.entity.iotdb.DeviceState" <select id="statesInTimeRange" resultMap="BaseResultMap"> select <include refid="all_columns" /> FROM root.f2.state_${workstationId} <where> is_deleted=false <if test="timeRange.startTime != null"> AND time >= #{timeRange.startTime} </if> AND time <= #{timeRange.endTime} </where> order by time asc align by device </select>--> <select id="fixPointCountByDate" resultType="Long"> select count(factory_date) FROM root.f2.state_* where factory_date=#{factoryDate} and is_fix_point=true </select>