From 04d53749b21921c9bceebe120d170c2ee6e533af Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 13 十一月 2024 21:21:46 +0800 Subject: [PATCH] 增加离线检查定时任务的逻辑 --- collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java | 5 collect/src/main/resources/application.yml | 2 collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java | 30 +++ collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java | 6 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java | 19 - collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java | 5 collect/src/main/resources/application-dev.yml | 6 collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java | 2 collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 5 collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java | 106 ++++++++++ collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java | 19 + collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java | 51 ++-- collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java | 79 +++++++ collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 1 collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java | 4 collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java | 11 collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java | 15 - collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java | 100 ++++++++++ collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java | 103 ++++++++++ collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml | 8 collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java | 10 collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java | 2 22 files changed, 512 insertions(+), 77 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java index dc91d11..7f40c31 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java @@ -107,12 +107,7 @@ } return result; } - /* - public static Boolean clearWorkStationCache() { - String redisKey = "posting:workstation".concat("::").concat(WORKSTATION_ALL); - return bladeRedis.del(redisKey); - } - */ + /** * 鑾峰彇鎸囧畾鏃ユ湡鐨勬棩鍘嗕唬鐮� * @param workstationId @@ -135,15 +130,15 @@ return calendarCode; } - public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String code) { + public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String deviceStatusCode) { String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ID).concat(workstationId.toString() .concat(WCS_SETTING)); //GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hGet(redisKey, code); - GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, code); + GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, deviceStatusCode); if (wcsSetting == null) { wcsSetting = globalWcsOfRpsMapper.selectOne(Wrappers.<GlobalWcsOfRps>lambdaQuery() - .eq(GlobalWcsOfRps::getCode, code) + .eq(GlobalWcsOfRps::getCode, deviceStatusCode) .isNull(GlobalWcsOfRps::getPrecondition)); if(wcsSetting == null) { wcsSetting = new GlobalWcsOfRps(); @@ -153,7 +148,7 @@ //wcsSetting = Func.isNotEmpty(wcsSetting) ? wcsSetting : GlobalWcsOfRps.builder().rps(0).isPlan(0).build(); //bladeRedis.hSet(redisKey, code, wcsSetting); //bladeRedis.expire(redisKey, Duration.ofDays(1L)); - redisUtil.hset(redisKey, code, wcsSetting, Duration.ofDays(1L).getSeconds()); + redisUtil.hset(redisKey, deviceStatusCode, wcsSetting, Duration.ofDays(1L).getSeconds()); } return wcsSetting; } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java index e96c3f6..592d1e1 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java @@ -41,6 +41,10 @@ @Value("${mqtt.password:}") private String mqttPassword; + + @Value("${mqtt.timeout:1000}") + private int timeout; + @Autowired private IOTMqttReceiveService recService; @Autowired @@ -73,6 +77,7 @@ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" + options.setConnectionTimeout(timeout); if(ObjectUtil.isNotEmpty(mqttUserName)) { options.setUserName(mqttUserName); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java new file mode 100644 index 0000000..27b05b9 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java @@ -0,0 +1,19 @@ +package com.qianwen.mdc.collect.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties(prefix = "mqtt") +public class MqttProperties { + private String host; + private String username; + private String password; + private String connectionTimeout; + + private int keepalive; + private String dataReceiveTopic; + private int timeout; + private boolean cleansession; + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java b/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java index ecdf315..44cf5ca 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java @@ -36,4 +36,9 @@ * 鍛婅妯℃澘鍚嶇О alarm */ public static final String TEMPLATE_ALARM = "alarm"; + + /** + * 杩囩▼鍙傛暟琛ㄦā鏉� + */ + public static final String TEMPLATE_PROCESS_PARAM = "process_param"; } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java index acebd02..8721b46 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java @@ -23,6 +23,7 @@ import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; +import com.qianwen.mdc.collect.service.DeviceOfflineCheckService; import com.qianwen.mdc.collect.service.DeviceStateFixPointService; import com.qianwen.mdc.collect.service.IOTMqttReceiveService; import com.qianwen.mdc.collect.service.IotDBCommonService; @@ -37,7 +38,8 @@ public class CalController { @Autowired private TimeSliceCache timeSliceCache; - + @Autowired + DeviceOfflineCheckService offService; String calCode= "1"; @GetMapping("/gett") @@ -58,4 +60,10 @@ return "settOK,targetDate="+targetDate; } + + @GetMapping("/offline") + public Object off(long wid) { + offService.handleWorkstationOffline(wid); + return "1"; + } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java index a88977d..97e4bb7 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java @@ -53,9 +53,9 @@ @GetMapping("/last") public Object last() { - List<ProcessParam> list1= mapper.mylist(1656819188967653378L); - System.out.print(list1); - return list1; + //ProcessParam last= mapper.lastParamInDuration(1656819188967653378L,300000); + //System.out.print(list1); + return "dd"; } @DS("iotdb") diff --git a/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java b/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java index 15571e5..60fac22 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java @@ -1,7 +1,9 @@ package com.qianwen.mdc.collect.dto; import java.util.Date; - +/** + * 鐢熶骇鏃ュ巻鐨勭彮鍒朵俊鎭� + */ public class CalendarShiftInfoDTO { private Long workstationId; private String code; diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java new file mode 100644 index 0000000..3c0e4e5 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java @@ -0,0 +1,30 @@ +package com.qianwen.mdc.collect.job; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.qianwen.mdc.collect.service.DeviceOfflineCheckService; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import com.xxl.job.core.log.XxlJobLogger; + +@Component +public class DeviceOfflineStatusCheckJob { + private static final Logger log = LoggerFactory.getLogger(DeviceOfflineStatusCheckJob.class); + @Resource + private DeviceOfflineCheckService offlineCheckService; + + + @XxlJob("offlineCheckJobHandler") + public ReturnT<String> offlineCheckJobHandler(String param) throws Exception { + XxlJobLogger.log("瀹氭椂鐩戞帶宸ヤ綅鏄惁绂荤嚎",param); + + offlineCheckService.checkOffline(); + log.info("绂荤嚎鐘舵�佸鐞嗙粨鏉�"); + XxlJobLogger.log("瀹氭椂鐩戞帶宸ヤ綅鏄惁绂荤嚎瀹屾垚", new Object[0]); + return ReturnT.SUCCESS; + } +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java index f0ac7cc..25aa371 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java @@ -1,7 +1,5 @@ package com.qianwen.mdc.collect.mapper.iotdb; -import java.util.List; - import org.apache.ibatis.annotations.Param; import com.baomidou.dynamic.datasource.annotation.DS; @@ -10,7 +8,12 @@ @DS("iotdb") public interface ProcessParamMapper { - - List<ProcessParam> mylist(@Param("workstationId") Long workstationId); + /** + * 鏌ヨ宸ヤ綅鍦ㄦ渶杩慸uration姣鍐呮渶鍚庝竴涓弬鏁� + * @param workstationId + * @param duration + * @return + */ + ProcessParam lastParamByWorstationId(@Param("workstationId") Long workstationId); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java index 8724fa7..39b4eb0 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java @@ -110,6 +110,6 @@ shiftEndTime = endTime; } CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = CalendarShiftTimeSlicesDTO.builder().shiftTimeType(productionCalendarDaytime.getShiftType().toString()).endTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftEndTime)).startTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftStartTime)).shiftIndex(productionCalendarDaytime.getShiftIndex()).shiftTimeType(productionCalendarDaytime.getShiftType().toString()).factoryDate(LocalDateTimeUtils.formatTimeLocalDate(productionCalendarDaytime.getCalendarDate(), "yyyy-MM-dd")).factoryMonth(productionCalendarDaytime.getMonth()).factoryWeek(productionCalendarDaytime.getWeek()).factoryYear(productionCalendarDaytime.getYear()).build(); - timeSlicesDTOMap.put(Integer.valueOf(minutesOfDay), calendarShiftTimeSlicesDTO); + timeSlicesDTOMap.put(minutesOfDay, calendarShiftTimeSlicesDTO); } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index f599308..73fe491 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -140,7 +140,6 @@ Tablet tablet = new Tablet(deviceId, schemas); for(TypedTelemetryData tdata : typeList) { - rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, tdata.getTime()); tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java new file mode 100644 index 0000000..d73cce8 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java @@ -0,0 +1,100 @@ +package com.qianwen.mdc.collect.service; + +import java.util.Arrays; +import java.util.Date; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import com.qianwen.mdc.collect.cache.WorkstationCache; +import com.qianwen.mdc.collect.dto.WorkstationDTO; +import com.qianwen.mdc.collect.entity.iotdb.DeviceState; +import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; +import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum; +import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; +import com.qianwen.mdc.collect.vo.FactoryDataVO; + +import cn.hutool.core.util.ObjectUtil; +/** + * 璁惧绂荤嚎澶勭悊鏈嶅姟,瓒呰繃涓�瀹氭椂闂存棤閲囬泦鏁版嵁鍒ゅ畾涓虹绾匡紝鎻掑叆绂荤嚎鐘舵�佹暟鎹� + */ +@Service +public class DeviceOfflineCheckService{ + private static final Logger log = LoggerFactory.getLogger(DeviceOfflineCheckService.class); + /** + * 绂荤嚎鍒ゅ畾鏃堕暱锛岄粯璁�5鍒嗛挓 + */ + @Value("${offlineConfigDuration:3000000}") + private long offlineConfigDuration; + public static final String OFFLINE_VALUE = "4";//绂荤嚎鐘舵�佸�� + @Autowired + private ProcessParamService paramService; + + @Autowired + private WorkstationCache workstationCache; + @Autowired + private ProcessParamMapper processParamMapper; + + @Autowired + private FactoryDataService factoryDataService; + /** + * 淇濆瓨鐘舵�佸浐瀹氱偣鏁版嵁(state_{workstationId}) + * @param workstationIdList + */ + public void checkOffline() { + //灏嗘暟鎹寜鐓у伐浣峣d鍒嗙粍 + Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations(); + if (ObjectUtil.isEmpty(workstationsMap)) { + log.info("缂撳瓨鏃犺澶囨暟鎹紝閫�鍑�"); + return; + } + + Set<String> workstationIds = workstationsMap.keySet(); + + log.info("绂荤嚎妫�鏌ュ伐浣嶆�绘暟:{}",workstationsMap.size()); + + for(String workstationId :workstationIds) { + log.info("寮�濮嬫鏌ュ伐浣峽}鐨勭绾跨姸鎬�",workstationId); + Long wid = Long.parseLong(workstationId); + handleWorkstationOffline(wid); + } + + } + /** + * 妫�鏌ヤ竴涓伐浣嶆槸鍚︾绾匡紝绂荤嚎鍒欏~鍏呯绾跨姸鎬佹暟鎹� + * @param workstationId + */ + public void handleWorkstationOffline(long workstationId) { + ProcessParam lastParam = processParamMapper.lastParamByWorstationId(workstationId); + + long nowMills = System.currentTimeMillis(); + long onlineRange = nowMills - offlineConfigDuration; + if(lastParam != null && lastParam.getTime()> onlineRange) { + //鏈夋暟鎹紝涓斿湪鍒ゅ畾鏃堕棿鍐�-銆嬪湪绾� + return; + } + //TODO 杩欓噷涓�鐩寸绾挎槸浠�涔堟暟鎹� + + //鏃堕棿娈靛唴鏃犲弬鏁帮紝璇存槑璁惧娌¢噰闆嗘暟鎹垽瀹氫负绂荤嚎锛屾彃鍏ョ姸鎬侊紝鐒跺悗鍙戦�乺ealTime娑堟伅缁檓dc + + ProcessParam statusParam = new ProcessParam(); + statusParam.setTime(nowMills); + statusParam.setN("DeviceStatus"); + statusParam.setV(OFFLINE_VALUE); + statusParam.setWorkstationId(workstationId); + if(lastParam == null || !lastParam.getN().equals("DeviceStatus")) { + //涔嬪墠鏃犱换浣曢噰闆嗙殑鏁版嵁(鎴栬�呬笂涓�鏉′笉鏄姸鎬佹暟鎹�)锛屾柊鍔犱竴鏉$绾跨姸鎬佹暟鎹� + paramService.insertProcessParam(statusParam); + } + + //閫氱煡mdc鏇存柊瀹炴椂鐘舵�� + paramService.sendRealtimeDataMsg(statusParam); + + } + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java index ca31fd4..8db7e81 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java @@ -289,29 +289,36 @@ return default24HourPointDTOList; } + /** + * 濉厖璁惧鐘舵�佺殑鐝淇℃伅 + * @param calendarShiftList + * @param default24HourPointDTOList + * @param state + */ private void packCalendarShiftInfoForTimePoint(List<CalendarShiftInfoDTO> calendarShiftList, List<DeviceState> default24HourPointDTOList, DeviceState state) { - if (Func.isNotEmpty(calendarShiftList)) { - CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> { - return item.getStartTime().getTime() <= state.getTime().longValue() && item.getEndTime().getTime() > state.getTime().longValue(); - }).findFirst().orElse(null); - - if (Func.isNotEmpty(relatedShift)) { - state.setShiftIndex(relatedShift.getShiftIndex()); - state.setShiftTimeType(relatedShift.getShiftTimeType()); - state.setFactoryYear(relatedShift.getFactoryYear()); - state.setFactoryMonth(relatedShift.getFactoryMonth()); - state.setFactoryWeek(relatedShift.getFactoryWeek()); - String factoryDate = relatedShift.getFactoryDate(); - String[] split = Func.split(factoryDate, "-"); - state.setFactoryDate(Integer.valueOf(String.join("", split))); - state.setIsDeleted(Boolean.FALSE); - default24HourPointDTOList.add(state); - return; - } - log.warn("宸ヤ綅{} 鏃ュ巻{} 鏈壘鍒版暣鐐圭彮娆′俊鎭�", state.getWorkstationId(), state.getCalendarCode()); - return; + if (Func.isEmpty(calendarShiftList)) { + log.error("宸ヤ綅{} 鏃ュ巻{} 鏃犳棩鏈�:[{}]鐨勭彮娆′俊鎭�", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()}); + return; + } + + CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> { + return item.getStartTime().getTime() <= state.getTime() && item.getEndTime().getTime() > state.getTime(); + }).findFirst().orElse(null); + + if (Func.isNotEmpty(relatedShift)) { + state.setShiftIndex(relatedShift.getShiftIndex()); + state.setShiftTimeType(relatedShift.getShiftTimeType()); + state.setFactoryYear(relatedShift.getFactoryYear()); + state.setFactoryMonth(relatedShift.getFactoryMonth()); + state.setFactoryWeek(relatedShift.getFactoryWeek()); + String factoryDate = relatedShift.getFactoryDate(); + String[] split = Func.split(factoryDate, "-"); + state.setFactoryDate(Integer.valueOf(String.join("", split))); + state.setIsDeleted(false); + default24HourPointDTOList.add(state); + }else { + log.warn("宸ヤ綅{} 鏃ュ巻{} 鏈壘鍒板浐瀹氱偣鐝淇℃伅", state.getWorkstationId(), state.getCalendarCode()); } - log.error("宸ヤ綅{} 鏃ュ巻{} 鏃犺嚜鐒跺ぉ{}鐝淇℃伅", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()}); } //涓嶄竴瀹氳兘鐢ㄤ笂 @@ -325,7 +332,7 @@ workstationState.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX); workstationState.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE); workstationState.setFeedbackPointType(FeedbackTimePointEnum.NO_FEED_BACK_POINT.getValue()); - workstationState.setIsDeleted(Boolean.FALSE); + workstationState.setIsDeleted(false); } } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java index a59fce6..8a68ca3 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java @@ -1,10 +1,6 @@ package com.qianwen.mdc.collect.service; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -18,24 +14,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.google.common.collect.Lists; -import com.qianwen.core.tool.utils.Func; -import com.qianwen.core.tool.utils.SpringUtil; -import com.qianwen.mdc.collect.cache.WorkstationCache; import com.qianwen.mdc.collect.config.IotDBSessionConfig; -import com.qianwen.mdc.collect.constants.CommonConstant; import com.qianwen.mdc.collect.constants.IOTDBConstant; -import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO; -import com.qianwen.mdc.collect.dto.WorkstationDTO; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; -import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum; -import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper; -import com.qianwen.mdc.collect.utils.LocalDateTimeUtils; - -import cn.hutool.core.date.DatePattern; -import cn.hutool.core.date.DateTime; -import cn.hutool.core.date.DateUtil; -import cn.hutool.core.util.ObjectUtil; /** * 璁惧鐘舵�佹櫘閫氭湇鍔� */ diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java new file mode 100644 index 0000000..0afa9c1 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java @@ -0,0 +1,106 @@ +package com.qianwen.mdc.collect.service; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Sets; +import com.qianwen.mdc.collect.cache.TimeSliceCache; +import com.qianwen.mdc.collect.cache.WorkstationCache; +import com.qianwen.mdc.collect.constants.CommonConstant; +import com.qianwen.mdc.collect.domain.TelemetryData; +import com.qianwen.mdc.collect.domain.TelemetryDataItem; +import com.qianwen.mdc.collect.dto.CacheBuildDTO; +import com.qianwen.mdc.collect.dto.CalendarShiftTimeSlicesDTO; +import com.qianwen.mdc.collect.dto.PackedTelemetryData; +import com.qianwen.mdc.collect.handler.PackedTelemetryDataHandlerSelector; +import com.qianwen.mdc.collect.handler.TelemetryDataHandler; +import com.qianwen.mdc.collect.utils.LocalDateTimeUtils; +import com.qianwen.mdc.collect.vo.FactoryDataVO; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.ObjectUtil; + +/** + * 宸ュ巶鏁版嵁鑾峰彇 + */ +@Service +public class FactoryDataService { + private static final Logger log = LoggerFactory.getLogger(FactoryDataService.class); + + private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); + + + @Autowired + private WorkstationCache workstationCache; + @Autowired + private TimeSliceCache timeSliceCache; + + /** + * 濉厖鐢熶骇鏃ュ巻code锛屼互鍙奻actoryyear + * + * @param pdata + */ + public FactoryDataVO getFactoryData(long workstationId,long dataCollectTime) { + //鍘熸潵鍦═elemetryPropertyWrapper.packWorkstationCalendarInfo涓畬鎴� + FactoryDataVO result = new FactoryDataVO(); + + Date collectTime = new Date(dataCollectTime); + String calendarCode = workstationCache.getWorkstationCalendarCodeForDate(workstationId, DateUtil.formatDate(DateUtil.date(collectTime))); + if (ObjectUtil.isNotEmpty(calendarCode)) { + result.setCalendarCode(calendarCode); + } else { + //telemetryData.setCalendarCode("#default#"); + result.setCalendarCode("#default#"); + } + + // TelemetryPropertyWrapper涓璸ackWorkstationCalendarInfo + //------------start + boolean isDefaultCalendar = true; + if (!"#default#".equals(result.getCalendarCode())) { + CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime);//浠巖edis涓幏寰楁棩鍘嗙殑鏃堕棿鍒囩墖 + if (ObjectUtil.isEmpty(calendarShiftTimeSlicesDTO)) {//濡傛灉娌℃湁鏃堕棿鍒囩墖锛屽垯浣跨敤TimeSliceCache.build(cacheBuildDTO);鏋勫缓 + LocalDate targetDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate(); + CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).calendarCode(calendarCode).targetDate(targetDate).build(); + timeSliceCache.build(cacheBuildDTO); + calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime); + } + if (ObjectUtil.isNotEmpty(calendarShiftTimeSlicesDTO)) { + result.setShiftIndex(calendarShiftTimeSlicesDTO.getShiftIndex()); + result.setShiftTimeType(Integer.valueOf(calendarShiftTimeSlicesDTO.getShiftTimeType())); + result.setFactoryDate(Integer.valueOf(calendarShiftTimeSlicesDTO.getFactoryDate().replaceAll("-", ""))); + result.setFactoryWeek(calendarShiftTimeSlicesDTO.getFactoryWeek()); + result.setFactoryMonth(calendarShiftTimeSlicesDTO.getFactoryMonth()); + result.setFactoryYear(calendarShiftTimeSlicesDTO.getFactoryYear()); + isDefaultCalendar = false; + } + } + + //鏃犳棩鍘嗗垏鐗囷紝浣跨敤閲囬泦鏃堕棿濉厖factoryYear锛宮onth锛宒ate锛寃eek鍑犱釜灞炴�� + if (isDefaultCalendar) { + log.info("鏃犳棩鍘嗗垏鐗�"); + LocalDate localDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate(); + result.setFactoryDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(collectTime))); + result.setFactoryYear(DateUtil.year(collectTime)); + result.setFactoryWeek(LocalDateTimeUtils.getWeek(localDate)); + result.setFactoryMonth(DateUtil.month(collectTime) + 1); + result.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX); + result.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE); + } + + + return result; + } + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java index fe8229d..9984718 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java @@ -51,7 +51,7 @@ try { iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId); } catch (Exception e) { - logger.error("鑾峰彇妯℃澘浣跨敤閿欒,template="+template+",deviceId="+deviceId,e); + logger.error("涓篸eviceId璁剧疆妯℃澘閿欒,template="+template+",deviceId="+deviceId,e); } } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java index 012bd36..d53dc1f 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java @@ -47,8 +47,6 @@ private WorkstationCache workstationCache; @Autowired private TimeSliceCache timeSliceCache; - @Autowired - private WorkstationDatapointsService dpService; static { PROCESS_PARAM_MAP.put(1, "STATE"); @@ -130,9 +128,6 @@ //telemetryData.setCalendarCode("#default#"); pdata.setCalendarCode("#default#"); } - - //pdata.setShiftIndex(1);//涓存椂 - //pdata.setShiftTimeType(1);//涓存椂 // TelemetryPropertyWrapper涓璸ackWorkstationCalendarInfo //------------start diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java new file mode 100644 index 0000000..e923804 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java @@ -0,0 +1,103 @@ +package com.qianwen.mdc.collect.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.alibaba.fastjson.JSONObject; +import com.qianwen.core.tool.utils.ObjectUtil; +import com.qianwen.mdc.collect.config.IotDBSessionConfig; +import com.qianwen.mdc.collect.constants.IOTDBConstant; +import com.qianwen.mdc.collect.domain.TelemetryData; +import com.qianwen.mdc.collect.domain.TelemetryDataItem; +import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; +import com.qianwen.mdc.collect.mqtt.MqttMessageSender; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; + +/** + * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 + */ +@Service +public class ProcessParamService { + private static final Logger log = LoggerFactory.getLogger(ProcessParamService.class); + + private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); + @Autowired + private IotDBSessionConfig iotdbConfig; + @Autowired + private IotDBCommonService iotDBCommonService; + @Autowired + private MqttMessageSender mqttMessageSender; + + + public static List<MeasurementSchema> schemas = new ArrayList<>(); + static { + schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); + schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); + schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); + } + /** + * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚� + */ + public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; + + + public void insertProcessParam(ProcessParam param) { + String deviceId = generateDeviceId(param.getWorkstationId(),param.getN()); + + iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId); + + Tablet tablet = new Tablet(deviceId, schemas); + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, param.getTime()); + tablet.addValue("workstation_id",rowIndex,param.getWorkstationId()); + tablet.addValue("n",rowIndex,param.getN()); + tablet.addValue("v",rowIndex,param.getV()); + try { + iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + //updateLastParam(dt.getWorkstationId(),typeList); + } catch (Exception e) { + log.error("IOTDB鍏ュ簱澶辫触",e); + }finally { + //iotdbConfig.getSessionPool().clo1se(); + } + } + + + /** + * 鍙戦�佸疄鏃舵暟鎹秷鎭� + * @param param + */ + public void sendRealtimeDataMsg(ProcessParam param) { + + //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡 + + JSONObject json = new JSONObject(); + json.put("workstationId",param.getWorkstationId()); + json.put("name", param.getN()); + json.put("value", param.getV()); + json.put("time", param.getTime()); + + mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); + + } + + + String generateDeviceId(long workstationId,String propertyName) { + return IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_PROCESS_PARAM+"_" + workstationId+"_"+propertyName; + } +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java b/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java new file mode 100644 index 0000000..d5d12bc --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java @@ -0,0 +1,79 @@ +package com.qianwen.mdc.collect.vo; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import cn.hutool.core.util.ObjectUtil; +import io.swagger.annotations.ApiModelProperty; + + +/** + * 宸ュ巶鏁版嵁VO + */ +public class FactoryDataVO implements Serializable{ + + /** + * 搴忓垪鍖栵紝搴斾负闇�瑕乻pring缂撳瓨 + */ + private static final long serialVersionUID = 6558493027948435061L; + + private String calendarCode; + private Integer factoryYear; + private Integer factoryMonth; + private Integer factoryWeek; + private Integer factoryDate; + private Integer shiftIndex; + private Integer shiftTimeType; + public String getCalendarCode() { + return calendarCode; + } + public void setCalendarCode(String calendarCode) { + this.calendarCode = calendarCode; + } + public Integer getFactoryYear() { + return factoryYear; + } + public void setFactoryYear(Integer factoryYear) { + this.factoryYear = factoryYear; + } + public Integer getFactoryMonth() { + return factoryMonth; + } + public void setFactoryMonth(Integer factoryMonth) { + this.factoryMonth = factoryMonth; + } + public Integer getFactoryWeek() { + return factoryWeek; + } + public void setFactoryWeek(Integer factoryWeek) { + this.factoryWeek = factoryWeek; + } + public Integer getFactoryDate() { + return factoryDate; + } + public void setFactoryDate(Integer factoryDate) { + this.factoryDate = factoryDate; + } + public Integer getShiftIndex() { + return shiftIndex; + } + public void setShiftIndex(Integer shiftIndex) { + this.shiftIndex = shiftIndex; + } + public Integer getShiftTimeType() { + return shiftTimeType; + } + public void setShiftTimeType(Integer shiftTimeType) { + this.shiftTimeType = shiftTimeType; + } + + + +} + diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml index a824160..66a4cb5 100644 --- a/collect/src/main/resources/application-dev.yml +++ b/collect/src/main/resources/application-dev.yml @@ -1,6 +1,6 @@ spring: redis: - database: 5 + database: 2 host: 120.46.212.231 port: 6379 password: root @@ -18,8 +18,8 @@ #MQTT-褰撳墠瀹㈡埛绔殑鍞竴鏍囪瘑 clientid: mqtt_publish default_topic: TEST #褰撳墠瀹㈡埛绔殑榛樿涓婚(澶у鏁版椂鍊欐病浠�涔堢敤) - #鍙戦�佽秴鏃舵椂闂� - mqtt.timeout: 1000 + #杩炴帴瓒呮椂鏃堕棿 + timeout: 3000 #蹇冭烦鏃堕棿 keepalive: 10 connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿 diff --git a/collect/src/main/resources/application.yml b/collect/src/main/resources/application.yml index e77ea4f..4eff48d 100644 --- a/collect/src/main/resources/application.yml +++ b/collect/src/main/resources/application.yml @@ -46,6 +46,8 @@ mapper-locations: classpath:mapper/*.xml type-aliases-package: com.qianwen.mdc.collect.entity.iotdb,com.qianwen.mdc.collect.entity.mgr #com.qianwen.mdc.collect.entity.iotdb +#绂荤嚎鍒ゅ畾鏃堕暱(姣),瓒呰繃杩欎釜鏃堕暱鏃犻噰闆嗘暟鎹垽瀹氫负绂荤嚎 +offlineConfigDuration: 3000000 wfg: # 璇蜂竴瀹氭敞鎰忥紒 WorkerIdBitLength + SeqBitLength + DataCenterIdBitLength <= 22 # 1琛ㄧず闆姳婕傜Щ绠楁硶锛�2琛ㄧず浼犵粺闆姳绠楁硶 diff --git a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml index ea817bd..cbe1a03 100644 --- a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml +++ b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml @@ -12,12 +12,8 @@ </resultMap> <!-- n,v,workstation_id --> - <select id="mylist" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam" - parameterType="java.lang.Long"> - select n,v,workstation_id from root.f2.process_param* - where workstation_id=#{workstationId} - order by time asc limit 1 align by device - + <select id="lastParamByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam"> + select n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_* order by time desc limit 1 align by device </select> </mapper> -- Gitblit v1.9.3