From 04d53749b21921c9bceebe120d170c2ee6e533af Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 13 十一月 2024 21:21:46 +0800
Subject: [PATCH] 增加离线检查定时任务的逻辑
---
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java | 5
collect/src/main/resources/application.yml | 2
collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java | 30 +++
collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java | 6
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java | 19 -
collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java | 5
collect/src/main/resources/application-dev.yml | 6
collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java | 2
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 5
collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java | 106 ++++++++++
collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java | 19 +
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java | 51 ++--
collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java | 79 +++++++
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 1
collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java | 4
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java | 11
collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java | 15 -
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java | 100 ++++++++++
collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java | 103 ++++++++++
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml | 8
collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java | 10
collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java | 2
22 files changed, 512 insertions(+), 77 deletions(-)
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
index dc91d11..7f40c31 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
@@ -107,12 +107,7 @@
}
return result;
}
- /*
- public static Boolean clearWorkStationCache() {
- String redisKey = "posting:workstation".concat("::").concat(WORKSTATION_ALL);
- return bladeRedis.del(redisKey);
- }
- */
+
/**
* 鑾峰彇鎸囧畾鏃ユ湡鐨勬棩鍘嗕唬鐮�
* @param workstationId
@@ -135,15 +130,15 @@
return calendarCode;
}
- public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String code) {
+ public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String deviceStatusCode) {
String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ID).concat(workstationId.toString()
.concat(WCS_SETTING));
//GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hGet(redisKey, code);
- GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, code);
+ GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, deviceStatusCode);
if (wcsSetting == null) {
wcsSetting = globalWcsOfRpsMapper.selectOne(Wrappers.<GlobalWcsOfRps>lambdaQuery()
- .eq(GlobalWcsOfRps::getCode, code)
+ .eq(GlobalWcsOfRps::getCode, deviceStatusCode)
.isNull(GlobalWcsOfRps::getPrecondition));
if(wcsSetting == null) {
wcsSetting = new GlobalWcsOfRps();
@@ -153,7 +148,7 @@
//wcsSetting = Func.isNotEmpty(wcsSetting) ? wcsSetting : GlobalWcsOfRps.builder().rps(0).isPlan(0).build();
//bladeRedis.hSet(redisKey, code, wcsSetting);
//bladeRedis.expire(redisKey, Duration.ofDays(1L));
- redisUtil.hset(redisKey, code, wcsSetting, Duration.ofDays(1L).getSeconds());
+ redisUtil.hset(redisKey, deviceStatusCode, wcsSetting, Duration.ofDays(1L).getSeconds());
}
return wcsSetting;
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
index e96c3f6..592d1e1 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -41,6 +41,10 @@
@Value("${mqtt.password:}")
private String mqttPassword;
+
+ @Value("${mqtt.timeout:1000}")
+ private int timeout;
+
@Autowired
private IOTMqttReceiveService recService;
@Autowired
@@ -73,6 +77,7 @@
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884"
+ options.setConnectionTimeout(timeout);
if(ObjectUtil.isNotEmpty(mqttUserName)) {
options.setUserName(mqttUserName);
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java
new file mode 100644
index 0000000..27b05b9
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java
@@ -0,0 +1,19 @@
+package com.qianwen.mdc.collect.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttProperties {
+ private String host;
+ private String username;
+ private String password;
+ private String connectionTimeout;
+
+ private int keepalive;
+ private String dataReceiveTopic;
+ private int timeout;
+ private boolean cleansession;
+
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java b/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
index ecdf315..44cf5ca 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
@@ -36,4 +36,9 @@
* 鍛婅妯℃澘鍚嶇О alarm
*/
public static final String TEMPLATE_ALARM = "alarm";
+
+ /**
+ * 杩囩▼鍙傛暟琛ㄦā鏉�
+ */
+ public static final String TEMPLATE_PROCESS_PARAM = "process_param";
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java
index acebd02..8721b46 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java
@@ -23,6 +23,7 @@
import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper;
import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.service.DeviceOfflineCheckService;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.IotDBCommonService;
@@ -37,7 +38,8 @@
public class CalController {
@Autowired
private TimeSliceCache timeSliceCache;
-
+ @Autowired
+ DeviceOfflineCheckService offService;
String calCode= "1";
@GetMapping("/gett")
@@ -58,4 +60,10 @@
return "settOK,targetDate="+targetDate;
}
+
+ @GetMapping("/offline")
+ public Object off(long wid) {
+ offService.handleWorkstationOffline(wid);
+ return "1";
+ }
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java
index a88977d..97e4bb7 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java
@@ -53,9 +53,9 @@
@GetMapping("/last")
public Object last() {
- List<ProcessParam> list1= mapper.mylist(1656819188967653378L);
- System.out.print(list1);
- return list1;
+ //ProcessParam last= mapper.lastParamInDuration(1656819188967653378L,300000);
+ //System.out.print(list1);
+ return "dd";
}
@DS("iotdb")
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java b/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java
index 15571e5..60fac22 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java
@@ -1,7 +1,9 @@
package com.qianwen.mdc.collect.dto;
import java.util.Date;
-
+/**
+ * 鐢熶骇鏃ュ巻鐨勭彮鍒朵俊鎭�
+ */
public class CalendarShiftInfoDTO {
private Long workstationId;
private String code;
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java
new file mode 100644
index 0000000..3c0e4e5
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java
@@ -0,0 +1,30 @@
+package com.qianwen.mdc.collect.job;
+
+import javax.annotation.Resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.qianwen.mdc.collect.service.DeviceOfflineCheckService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+
+@Component
+public class DeviceOfflineStatusCheckJob {
+ private static final Logger log = LoggerFactory.getLogger(DeviceOfflineStatusCheckJob.class);
+ @Resource
+ private DeviceOfflineCheckService offlineCheckService;
+
+
+ @XxlJob("offlineCheckJobHandler")
+ public ReturnT<String> offlineCheckJobHandler(String param) throws Exception {
+ XxlJobLogger.log("瀹氭椂鐩戞帶宸ヤ綅鏄惁绂荤嚎",param);
+
+ offlineCheckService.checkOffline();
+ log.info("绂荤嚎鐘舵�佸鐞嗙粨鏉�");
+ XxlJobLogger.log("瀹氭椂鐩戞帶宸ヤ綅鏄惁绂荤嚎瀹屾垚", new Object[0]);
+ return ReturnT.SUCCESS;
+ }
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
index f0ac7cc..25aa371 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
@@ -1,7 +1,5 @@
package com.qianwen.mdc.collect.mapper.iotdb;
-import java.util.List;
-
import org.apache.ibatis.annotations.Param;
import com.baomidou.dynamic.datasource.annotation.DS;
@@ -10,7 +8,12 @@
@DS("iotdb")
public interface ProcessParamMapper {
-
- List<ProcessParam> mylist(@Param("workstationId") Long workstationId);
+ /**
+ * 鏌ヨ宸ヤ綅鍦ㄦ渶杩慸uration姣鍐呮渶鍚庝竴涓弬鏁�
+ * @param workstationId
+ * @param duration
+ * @return
+ */
+ ProcessParam lastParamByWorstationId(@Param("workstationId") Long workstationId);
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java
index 8724fa7..39b4eb0 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java
@@ -110,6 +110,6 @@
shiftEndTime = endTime;
}
CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = CalendarShiftTimeSlicesDTO.builder().shiftTimeType(productionCalendarDaytime.getShiftType().toString()).endTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftEndTime)).startTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftStartTime)).shiftIndex(productionCalendarDaytime.getShiftIndex()).shiftTimeType(productionCalendarDaytime.getShiftType().toString()).factoryDate(LocalDateTimeUtils.formatTimeLocalDate(productionCalendarDaytime.getCalendarDate(), "yyyy-MM-dd")).factoryMonth(productionCalendarDaytime.getMonth()).factoryWeek(productionCalendarDaytime.getWeek()).factoryYear(productionCalendarDaytime.getYear()).build();
- timeSlicesDTOMap.put(Integer.valueOf(minutesOfDay), calendarShiftTimeSlicesDTO);
+ timeSlicesDTOMap.put(minutesOfDay, calendarShiftTimeSlicesDTO);
}
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index f599308..73fe491 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -140,7 +140,6 @@
Tablet tablet = new Tablet(deviceId, schemas);
for(TypedTelemetryData tdata : typeList) {
-
rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, tdata.getTime());
tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java
new file mode 100644
index 0000000..d73cce8
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java
@@ -0,0 +1,100 @@
+package com.qianwen.mdc.collect.service;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import com.qianwen.mdc.collect.cache.WorkstationCache;
+import com.qianwen.mdc.collect.dto.WorkstationDTO;
+import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
+import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
+import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum;
+import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper;
+import com.qianwen.mdc.collect.vo.FactoryDataVO;
+
+import cn.hutool.core.util.ObjectUtil;
+/**
+ * 璁惧绂荤嚎澶勭悊鏈嶅姟,瓒呰繃涓�瀹氭椂闂存棤閲囬泦鏁版嵁鍒ゅ畾涓虹绾匡紝鎻掑叆绂荤嚎鐘舵�佹暟鎹�
+ */
+@Service
+public class DeviceOfflineCheckService{
+ private static final Logger log = LoggerFactory.getLogger(DeviceOfflineCheckService.class);
+ /**
+ * 绂荤嚎鍒ゅ畾鏃堕暱锛岄粯璁�5鍒嗛挓
+ */
+ @Value("${offlineConfigDuration:3000000}")
+ private long offlineConfigDuration;
+ public static final String OFFLINE_VALUE = "4";//绂荤嚎鐘舵�佸��
+ @Autowired
+ private ProcessParamService paramService;
+
+ @Autowired
+ private WorkstationCache workstationCache;
+ @Autowired
+ private ProcessParamMapper processParamMapper;
+
+ @Autowired
+ private FactoryDataService factoryDataService;
+ /**
+ * 淇濆瓨鐘舵�佸浐瀹氱偣鏁版嵁(state_{workstationId})
+ * @param workstationIdList
+ */
+ public void checkOffline() {
+ //灏嗘暟鎹寜鐓у伐浣峣d鍒嗙粍
+ Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations();
+ if (ObjectUtil.isEmpty(workstationsMap)) {
+ log.info("缂撳瓨鏃犺澶囨暟鎹紝閫�鍑�");
+ return;
+ }
+
+ Set<String> workstationIds = workstationsMap.keySet();
+
+ log.info("绂荤嚎妫�鏌ュ伐浣嶆�绘暟:{}",workstationsMap.size());
+
+ for(String workstationId :workstationIds) {
+ log.info("寮�濮嬫鏌ュ伐浣峽}鐨勭绾跨姸鎬�",workstationId);
+ Long wid = Long.parseLong(workstationId);
+ handleWorkstationOffline(wid);
+ }
+
+ }
+ /**
+ * 妫�鏌ヤ竴涓伐浣嶆槸鍚︾绾匡紝绂荤嚎鍒欏~鍏呯绾跨姸鎬佹暟鎹�
+ * @param workstationId
+ */
+ public void handleWorkstationOffline(long workstationId) {
+ ProcessParam lastParam = processParamMapper.lastParamByWorstationId(workstationId);
+
+ long nowMills = System.currentTimeMillis();
+ long onlineRange = nowMills - offlineConfigDuration;
+ if(lastParam != null && lastParam.getTime()> onlineRange) {
+ //鏈夋暟鎹紝涓斿湪鍒ゅ畾鏃堕棿鍐�-銆嬪湪绾�
+ return;
+ }
+ //TODO 杩欓噷涓�鐩寸绾挎槸浠�涔堟暟鎹�
+
+ //鏃堕棿娈靛唴鏃犲弬鏁帮紝璇存槑璁惧娌¢噰闆嗘暟鎹垽瀹氫负绂荤嚎锛屾彃鍏ョ姸鎬侊紝鐒跺悗鍙戦�乺ealTime娑堟伅缁檓dc
+
+ ProcessParam statusParam = new ProcessParam();
+ statusParam.setTime(nowMills);
+ statusParam.setN("DeviceStatus");
+ statusParam.setV(OFFLINE_VALUE);
+ statusParam.setWorkstationId(workstationId);
+ if(lastParam == null || !lastParam.getN().equals("DeviceStatus")) {
+ //涔嬪墠鏃犱换浣曢噰闆嗙殑鏁版嵁(鎴栬�呬笂涓�鏉′笉鏄姸鎬佹暟鎹�)锛屾柊鍔犱竴鏉$绾跨姸鎬佹暟鎹�
+ paramService.insertProcessParam(statusParam);
+ }
+
+ //閫氱煡mdc鏇存柊瀹炴椂鐘舵��
+ paramService.sendRealtimeDataMsg(statusParam);
+
+ }
+
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
index ca31fd4..8db7e81 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
@@ -289,29 +289,36 @@
return default24HourPointDTOList;
}
+ /**
+ * 濉厖璁惧鐘舵�佺殑鐝淇℃伅
+ * @param calendarShiftList
+ * @param default24HourPointDTOList
+ * @param state
+ */
private void packCalendarShiftInfoForTimePoint(List<CalendarShiftInfoDTO> calendarShiftList, List<DeviceState> default24HourPointDTOList, DeviceState state) {
- if (Func.isNotEmpty(calendarShiftList)) {
- CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> {
- return item.getStartTime().getTime() <= state.getTime().longValue() && item.getEndTime().getTime() > state.getTime().longValue();
- }).findFirst().orElse(null);
-
- if (Func.isNotEmpty(relatedShift)) {
- state.setShiftIndex(relatedShift.getShiftIndex());
- state.setShiftTimeType(relatedShift.getShiftTimeType());
- state.setFactoryYear(relatedShift.getFactoryYear());
- state.setFactoryMonth(relatedShift.getFactoryMonth());
- state.setFactoryWeek(relatedShift.getFactoryWeek());
- String factoryDate = relatedShift.getFactoryDate();
- String[] split = Func.split(factoryDate, "-");
- state.setFactoryDate(Integer.valueOf(String.join("", split)));
- state.setIsDeleted(Boolean.FALSE);
- default24HourPointDTOList.add(state);
- return;
- }
- log.warn("宸ヤ綅{} 鏃ュ巻{} 鏈壘鍒版暣鐐圭彮娆′俊鎭�", state.getWorkstationId(), state.getCalendarCode());
- return;
+ if (Func.isEmpty(calendarShiftList)) {
+ log.error("宸ヤ綅{} 鏃ュ巻{} 鏃犳棩鏈�:[{}]鐨勭彮娆′俊鎭�", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()});
+ return;
+ }
+
+ CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> {
+ return item.getStartTime().getTime() <= state.getTime() && item.getEndTime().getTime() > state.getTime();
+ }).findFirst().orElse(null);
+
+ if (Func.isNotEmpty(relatedShift)) {
+ state.setShiftIndex(relatedShift.getShiftIndex());
+ state.setShiftTimeType(relatedShift.getShiftTimeType());
+ state.setFactoryYear(relatedShift.getFactoryYear());
+ state.setFactoryMonth(relatedShift.getFactoryMonth());
+ state.setFactoryWeek(relatedShift.getFactoryWeek());
+ String factoryDate = relatedShift.getFactoryDate();
+ String[] split = Func.split(factoryDate, "-");
+ state.setFactoryDate(Integer.valueOf(String.join("", split)));
+ state.setIsDeleted(false);
+ default24HourPointDTOList.add(state);
+ }else {
+ log.warn("宸ヤ綅{} 鏃ュ巻{} 鏈壘鍒板浐瀹氱偣鐝淇℃伅", state.getWorkstationId(), state.getCalendarCode());
}
- log.error("宸ヤ綅{} 鏃ュ巻{} 鏃犺嚜鐒跺ぉ{}鐝淇℃伅", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()});
}
//涓嶄竴瀹氳兘鐢ㄤ笂
@@ -325,7 +332,7 @@
workstationState.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX);
workstationState.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE);
workstationState.setFeedbackPointType(FeedbackTimePointEnum.NO_FEED_BACK_POINT.getValue());
- workstationState.setIsDeleted(Boolean.FALSE);
+ workstationState.setIsDeleted(false);
}
}
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
index a59fce6..8a68ca3 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -1,10 +1,6 @@
package com.qianwen.mdc.collect.service;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -18,24 +14,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import com.google.common.collect.Lists;
-import com.qianwen.core.tool.utils.Func;
-import com.qianwen.core.tool.utils.SpringUtil;
-import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
-import com.qianwen.mdc.collect.constants.CommonConstant;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
-import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO;
-import com.qianwen.mdc.collect.dto.WorkstationDTO;
import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
-import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum;
-import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper;
-import com.qianwen.mdc.collect.utils.LocalDateTimeUtils;
-
-import cn.hutool.core.date.DatePattern;
-import cn.hutool.core.date.DateTime;
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.core.util.ObjectUtil;
/**
* 璁惧鐘舵�佹櫘閫氭湇鍔�
*/
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java
new file mode 100644
index 0000000..0afa9c1
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java
@@ -0,0 +1,106 @@
+package com.qianwen.mdc.collect.service;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.google.common.collect.Sets;
+import com.qianwen.mdc.collect.cache.TimeSliceCache;
+import com.qianwen.mdc.collect.cache.WorkstationCache;
+import com.qianwen.mdc.collect.constants.CommonConstant;
+import com.qianwen.mdc.collect.domain.TelemetryData;
+import com.qianwen.mdc.collect.domain.TelemetryDataItem;
+import com.qianwen.mdc.collect.dto.CacheBuildDTO;
+import com.qianwen.mdc.collect.dto.CalendarShiftTimeSlicesDTO;
+import com.qianwen.mdc.collect.dto.PackedTelemetryData;
+import com.qianwen.mdc.collect.handler.PackedTelemetryDataHandlerSelector;
+import com.qianwen.mdc.collect.handler.TelemetryDataHandler;
+import com.qianwen.mdc.collect.utils.LocalDateTimeUtils;
+import com.qianwen.mdc.collect.vo.FactoryDataVO;
+
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.ObjectUtil;
+
+/**
+ * 宸ュ巶鏁版嵁鑾峰彇
+ */
+@Service
+public class FactoryDataService {
+ private static final Logger log = LoggerFactory.getLogger(FactoryDataService.class);
+
+ private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
+
+
+ @Autowired
+ private WorkstationCache workstationCache;
+ @Autowired
+ private TimeSliceCache timeSliceCache;
+
+ /**
+ * 濉厖鐢熶骇鏃ュ巻code锛屼互鍙奻actoryyear
+ *
+ * @param pdata
+ */
+ public FactoryDataVO getFactoryData(long workstationId,long dataCollectTime) {
+ //鍘熸潵鍦═elemetryPropertyWrapper.packWorkstationCalendarInfo涓畬鎴�
+ FactoryDataVO result = new FactoryDataVO();
+
+ Date collectTime = new Date(dataCollectTime);
+ String calendarCode = workstationCache.getWorkstationCalendarCodeForDate(workstationId, DateUtil.formatDate(DateUtil.date(collectTime)));
+ if (ObjectUtil.isNotEmpty(calendarCode)) {
+ result.setCalendarCode(calendarCode);
+ } else {
+ //telemetryData.setCalendarCode("#default#");
+ result.setCalendarCode("#default#");
+ }
+
+ // TelemetryPropertyWrapper涓璸ackWorkstationCalendarInfo
+ //------------start
+ boolean isDefaultCalendar = true;
+ if (!"#default#".equals(result.getCalendarCode())) {
+ CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime);//浠巖edis涓幏寰楁棩鍘嗙殑鏃堕棿鍒囩墖
+ if (ObjectUtil.isEmpty(calendarShiftTimeSlicesDTO)) {//濡傛灉娌℃湁鏃堕棿鍒囩墖锛屽垯浣跨敤TimeSliceCache.build(cacheBuildDTO);鏋勫缓
+ LocalDate targetDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate();
+ CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).calendarCode(calendarCode).targetDate(targetDate).build();
+ timeSliceCache.build(cacheBuildDTO);
+ calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime);
+ }
+ if (ObjectUtil.isNotEmpty(calendarShiftTimeSlicesDTO)) {
+ result.setShiftIndex(calendarShiftTimeSlicesDTO.getShiftIndex());
+ result.setShiftTimeType(Integer.valueOf(calendarShiftTimeSlicesDTO.getShiftTimeType()));
+ result.setFactoryDate(Integer.valueOf(calendarShiftTimeSlicesDTO.getFactoryDate().replaceAll("-", "")));
+ result.setFactoryWeek(calendarShiftTimeSlicesDTO.getFactoryWeek());
+ result.setFactoryMonth(calendarShiftTimeSlicesDTO.getFactoryMonth());
+ result.setFactoryYear(calendarShiftTimeSlicesDTO.getFactoryYear());
+ isDefaultCalendar = false;
+ }
+ }
+
+ //鏃犳棩鍘嗗垏鐗囷紝浣跨敤閲囬泦鏃堕棿濉厖factoryYear锛宮onth锛宒ate锛寃eek鍑犱釜灞炴��
+ if (isDefaultCalendar) {
+ log.info("鏃犳棩鍘嗗垏鐗�");
+ LocalDate localDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate();
+ result.setFactoryDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(collectTime)));
+ result.setFactoryYear(DateUtil.year(collectTime));
+ result.setFactoryWeek(LocalDateTimeUtils.getWeek(localDate));
+ result.setFactoryMonth(DateUtil.month(collectTime) + 1);
+ result.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX);
+ result.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE);
+ }
+
+
+ return result;
+ }
+
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
index fe8229d..9984718 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
@@ -51,7 +51,7 @@
try {
iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId);
} catch (Exception e) {
- logger.error("鑾峰彇妯℃澘浣跨敤閿欒,template="+template+",deviceId="+deviceId,e);
+ logger.error("涓篸eviceId璁剧疆妯℃澘閿欒,template="+template+",deviceId="+deviceId,e);
}
}
}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
index 012bd36..d53dc1f 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -47,8 +47,6 @@
private WorkstationCache workstationCache;
@Autowired
private TimeSliceCache timeSliceCache;
- @Autowired
- private WorkstationDatapointsService dpService;
static {
PROCESS_PARAM_MAP.put(1, "STATE");
@@ -130,9 +128,6 @@
//telemetryData.setCalendarCode("#default#");
pdata.setCalendarCode("#default#");
}
-
- //pdata.setShiftIndex(1);//涓存椂
- //pdata.setShiftTimeType(1);//涓存椂
// TelemetryPropertyWrapper涓璸ackWorkstationCalendarInfo
//------------start
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java
new file mode 100644
index 0000000..e923804
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java
@@ -0,0 +1,103 @@
+package com.qianwen.mdc.collect.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.qianwen.core.tool.utils.ObjectUtil;
+import com.qianwen.mdc.collect.config.IotDBSessionConfig;
+import com.qianwen.mdc.collect.constants.IOTDBConstant;
+import com.qianwen.mdc.collect.domain.TelemetryData;
+import com.qianwen.mdc.collect.domain.TelemetryDataItem;
+import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
+import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
+
+/**
+ * 閲囬泦鏁版嵁澶勭悊鍏ュ簱
+ */
+@Service
+public class ProcessParamService {
+ private static final Logger log = LoggerFactory.getLogger(ProcessParamService.class);
+
+ private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
+ @Autowired
+ private IotDBSessionConfig iotdbConfig;
+ @Autowired
+ private IotDBCommonService iotDBCommonService;
+ @Autowired
+ private MqttMessageSender mqttMessageSender;
+
+
+ public static List<MeasurementSchema> schemas = new ArrayList<>();
+ static {
+ schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
+ schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
+ }
+ /**
+ * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚�
+ */
+ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
+
+
+ public void insertProcessParam(ProcessParam param) {
+ String deviceId = generateDeviceId(param.getWorkstationId(),param.getN());
+
+ iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId);
+
+ Tablet tablet = new Tablet(deviceId, schemas);
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, param.getTime());
+ tablet.addValue("workstation_id",rowIndex,param.getWorkstationId());
+ tablet.addValue("n",rowIndex,param.getN());
+ tablet.addValue("v",rowIndex,param.getV());
+ try {
+ iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+ //updateLastParam(dt.getWorkstationId(),typeList);
+ } catch (Exception e) {
+ log.error("IOTDB鍏ュ簱澶辫触",e);
+ }finally {
+ //iotdbConfig.getSessionPool().clo1se();
+ }
+ }
+
+
+ /**
+ * 鍙戦�佸疄鏃舵暟鎹秷鎭�
+ * @param param
+ */
+ public void sendRealtimeDataMsg(ProcessParam param) {
+
+ //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡
+
+ JSONObject json = new JSONObject();
+ json.put("workstationId",param.getWorkstationId());
+ json.put("name", param.getN());
+ json.put("value", param.getV());
+ json.put("time", param.getTime());
+
+ mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
+
+ }
+
+
+ String generateDeviceId(long workstationId,String propertyName) {
+ return IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_PROCESS_PARAM+"_" + workstationId+"_"+propertyName;
+ }
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java b/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java
new file mode 100644
index 0000000..d5d12bc
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java
@@ -0,0 +1,79 @@
+package com.qianwen.mdc.collect.vo;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+
+import cn.hutool.core.util.ObjectUtil;
+import io.swagger.annotations.ApiModelProperty;
+
+
+/**
+ * 宸ュ巶鏁版嵁VO
+ */
+public class FactoryDataVO implements Serializable{
+
+ /**
+ * 搴忓垪鍖栵紝搴斾负闇�瑕乻pring缂撳瓨
+ */
+ private static final long serialVersionUID = 6558493027948435061L;
+
+ private String calendarCode;
+ private Integer factoryYear;
+ private Integer factoryMonth;
+ private Integer factoryWeek;
+ private Integer factoryDate;
+ private Integer shiftIndex;
+ private Integer shiftTimeType;
+ public String getCalendarCode() {
+ return calendarCode;
+ }
+ public void setCalendarCode(String calendarCode) {
+ this.calendarCode = calendarCode;
+ }
+ public Integer getFactoryYear() {
+ return factoryYear;
+ }
+ public void setFactoryYear(Integer factoryYear) {
+ this.factoryYear = factoryYear;
+ }
+ public Integer getFactoryMonth() {
+ return factoryMonth;
+ }
+ public void setFactoryMonth(Integer factoryMonth) {
+ this.factoryMonth = factoryMonth;
+ }
+ public Integer getFactoryWeek() {
+ return factoryWeek;
+ }
+ public void setFactoryWeek(Integer factoryWeek) {
+ this.factoryWeek = factoryWeek;
+ }
+ public Integer getFactoryDate() {
+ return factoryDate;
+ }
+ public void setFactoryDate(Integer factoryDate) {
+ this.factoryDate = factoryDate;
+ }
+ public Integer getShiftIndex() {
+ return shiftIndex;
+ }
+ public void setShiftIndex(Integer shiftIndex) {
+ this.shiftIndex = shiftIndex;
+ }
+ public Integer getShiftTimeType() {
+ return shiftTimeType;
+ }
+ public void setShiftTimeType(Integer shiftTimeType) {
+ this.shiftTimeType = shiftTimeType;
+ }
+
+
+
+}
+
diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml
index a824160..66a4cb5 100644
--- a/collect/src/main/resources/application-dev.yml
+++ b/collect/src/main/resources/application-dev.yml
@@ -1,6 +1,6 @@
spring:
redis:
- database: 5
+ database: 2
host: 120.46.212.231
port: 6379
password: root
@@ -18,8 +18,8 @@
#MQTT-褰撳墠瀹㈡埛绔殑鍞竴鏍囪瘑
clientid: mqtt_publish
default_topic: TEST #褰撳墠瀹㈡埛绔殑榛樿涓婚(澶у鏁版椂鍊欐病浠�涔堢敤)
- #鍙戦�佽秴鏃舵椂闂�
- mqtt.timeout: 1000
+ #杩炴帴瓒呮椂鏃堕棿
+ timeout: 3000
#蹇冭烦鏃堕棿
keepalive: 10
connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿
diff --git a/collect/src/main/resources/application.yml b/collect/src/main/resources/application.yml
index e77ea4f..4eff48d 100644
--- a/collect/src/main/resources/application.yml
+++ b/collect/src/main/resources/application.yml
@@ -46,6 +46,8 @@
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.qianwen.mdc.collect.entity.iotdb,com.qianwen.mdc.collect.entity.mgr
#com.qianwen.mdc.collect.entity.iotdb
+#绂荤嚎鍒ゅ畾鏃堕暱(姣),瓒呰繃杩欎釜鏃堕暱鏃犻噰闆嗘暟鎹垽瀹氫负绂荤嚎
+offlineConfigDuration: 3000000
wfg:
# 璇蜂竴瀹氭敞鎰忥紒 WorkerIdBitLength + SeqBitLength + DataCenterIdBitLength <= 22
# 1琛ㄧず闆姳婕傜Щ绠楁硶锛�2琛ㄧず浼犵粺闆姳绠楁硶
diff --git a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
index ea817bd..cbe1a03 100644
--- a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
+++ b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
@@ -12,12 +12,8 @@
</resultMap>
<!-- n,v,workstation_id -->
- <select id="mylist" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam"
- parameterType="java.lang.Long">
- select n,v,workstation_id from root.f2.process_param*
- where workstation_id=#{workstationId}
- order by time asc limit 1 align by device
-
+ <select id="lastParamByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam">
+ select n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_* order by time desc limit 1 align by device
</select>
</mapper>
--
Gitblit v1.9.3