From 03fc7dc5d536f08e711f7ddb79f720c2e1ded16b Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期日, 13 四月 2025 21:59:12 +0800 Subject: [PATCH] 增加状态核实;增加模板是否set的缓存优化 --- collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java | 74 ++++++++++++++++++++++++++++++++++++- 1 files changed, 72 insertions(+), 2 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java index 31b7301..98d9f25 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -12,9 +13,13 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; +import com.qianwen.mdc.collect.enums.DefaultWcsEnum; import com.qianwen.mdc.collect.utils.redis.RedisUtil; +import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO; +import com.qianwen.mdc.collect.vo.HistoryDeviceState; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service @@ -28,7 +33,8 @@ private PackedDataService packedDataService; @Autowired private WorkstationDatapointsService dpService; - + @Autowired + private DeviceStatusCheckCfgLoader cfgLoader; /** * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer * @param payload @@ -37,11 +43,75 @@ //瑙f瀽娑堟伅 List<TelemetryData> teleList = parsePayload(payload); + + checkDeviceStatus(teleList); + collectDataService.handleCollectData(teleList); packedDataService.handle(teleList); } + /** + * 妫�鏌ヨ澶囩姸鎬侊紝鏁版嵁锛屽鏋滄槸闂儊鐘舵��(杩愯,寰呮満鏉ュ洖鍒囨崲锛夛紝瑙勫垯鏄湪鏁版嵁琛ㄥ畾涔夌殑 + * @param teleList + */ + void checkDeviceStatus(List<TelemetryData> teleList){ + DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache(); + long interval = cfgLoader.getCheckConfigVOFromCache().getInterval(); + int wcs; + + for(TelemetryData tdata:teleList) {//tdata.getWorkstationId() + for(TelemetryDataItem dataItem:tdata.getDataItems()) { + List<Map<String,String>> points = dataItem.getDataPoints(); + for(Map<String,String> pt : points) { + if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){ + String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY); + wcs = Integer.parseInt(val); + //!!!code鑼冨洿妫�鏌� + computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO); + } + } + } + } + //di.getTime() + //tdata.getWorkstationId(); + + + } + + /** + * 鏍稿疄骞惰绠楃姸鎬佸�硷紝濡傛灉鏈夌伅闂儊鎯呭喌锛屽垯鍒ゅ畾涓哄緟鏈� + * @param workstationId + * @param wcs 鐘舵�佸�� + * @param pointTime 鏁版嵁鐐圭殑鏃堕棿鎴� + * @param point 鏁版嵁鐐筴ey鍜寁al + * @param interval 妫�鏌ョ殑鏃堕棿璺ㄥ害锛屾绉� + */ + void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map<String,String> point,DeviceStateCheckConfigVO chkConfigVO) { + + long interval = chkConfigVO.getInterval(); + if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//浠呴拡瀵硅繍琛岀姸鎬佺殑鎵嶈繘琛岀姸鎬佹牳瀹� + return; + } + + List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//浣曟椂鍒濆鍖栧唴閮ㄦ暟鎹紵锛� + + //pointTime-interval 3绉掑唴 + long count = statusHistoryList.stream().filter(his -> { + return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode(); + }).count(); + log.info("checkstatus count={}",count); + if(count > 0) { + //鐏棯鐑侊紝鍒ゅ畾涓哄緟鏈猴紝杩欓噷淇敼璁惧鐘舵�佺殑鏁版嵁 + point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+""); + } + HistoryDeviceState his = new HistoryDeviceState(); + his.setTime(pointTime); + his.setDeviceStatus(wcs); + his.setWorkstationId(workstationId); + statusHistoryList.add(his); + + } /** * 瑙f瀽娑堟伅浣撳瓧绗︿覆鏈垪琛紝姣忎釜鍒楄〃椤规槸涓�涓伐浣嶇殑鏁版嵁 * @param payload @@ -62,7 +132,7 @@ //鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃�,鍙繚瀛橀厤缃ソ鐨勬暟鎹偣锛屾病鏈夐厤缃殑閲囬泦鏁版嵁鎶涘純銆� final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); - //final WorkstationDatapointsVO dpVo = dpService.getDataPointByAppId(appId); + if(dpVo == null) { //宸ヤ綅娌℃湁瀹氫箟杩囨暟鎹偣鎴栬�卆ppId涓嶅尮閰� log.warn("appId={}鏈壘鍒版暟鎹偣瀹氫箟璁板綍锛屼涪寮冩暟鎹�",appId); -- Gitblit v1.9.3