From 859d6321b1b1c606de09e9b6a6286aaeace638fe Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 16 十月 2024 21:03:52 +0800 Subject: [PATCH] 接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃 --- collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java | 42 +++++++++++++++++++++++++++++++----------- 1 files changed, 31 insertions(+), 11 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java index 63e7ee7..3246a2f 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -16,6 +15,7 @@ import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.utils.redis.RedisUtil; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service public class IOTMqttReceiveService { @@ -26,13 +26,14 @@ @Autowired private PackedDataService packedDataService; @Autowired - private RedisUtil redisUtil; + private RedisUtil redisUtil; + @Autowired + private WorkstationDatapointsService dpService; /** * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer * @param payload */ public void handle(String payload) { - //System.out.println("Received message122: " + payload); //瑙f瀽娑堟伅 List<TelemetryData> teleList = parsePayload(payload); @@ -48,22 +49,32 @@ */ List<TelemetryData> parsePayload(String payload){ List<TelemetryData> dtList = new ArrayList<TelemetryData> (); - //鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d - //澶氭潯鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} + //澶氭潯 鏁版嵁鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174鏄簲鐢╥d //瑙f瀽娑堟伅 name,value褰㈠紡锛屽n=output,v=11 JSONObject jsonObj = JSONObject.parseObject(payload); Set<String> keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); + //WorkstationDatapointsVO dpVo; + final String NEWDP_SUFFIX = "_n";//璁$畻瑙勫垯浣跨敤涔嬪悗鏂版暟鎹偣鐨勭粨灏� for(String key : keys) { String appId = key;//iot绯荤粺涓殑搴旂敤id锛屾湰搴旂敤涓簲璇ョ敤琛ㄥ幓瀵瑰簲 - long workstationId = getWorkstationIdByAppId(appId); + + + //TODO 鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃�,鍙繚瀛橀厤缃ソ鐨勬暟鎹偣锛屾病鏈夐厤缃殑閲囬泦鏁版嵁鎶涘純銆� + final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); + if(dpVo == null) { + //宸ヤ綅娌℃湁瀹氫箟杩囨暟鎹偣鎴栬�卆ppId涓嶅尮閰� + log.warn("appId={}鏈壘鍒版暟鎹偣瀹氫箟璁板綍锛屼涪寮冩暟鎹�",appId); + continue; + } TelemetryData tdata = new TelemetryData(); - tdata.setWorkstationId(workstationId); - + //tdata.setWorkstationId(workstationId); + tdata.setAppId(appId); + tdata.setWorkstationId(dpVo.getWorkstationId()); JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i<dtArr.size();i++) { @@ -81,6 +92,10 @@ if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); } + if(!dpVo.containsDataPoint(oriValueKey)) { + //濡傛灉涓嶅瓨鍦ㄨ鏁版嵁鐐归厤缃紝璇ユ暟鎹洿鎺ュ拷鐣� + return; + } tdataItem.addPoint(oriValueKey,values.getString(valueKey));//浣跨敤鍘熷閰嶇疆鐐逛繚鎸佷繚瀛樻暟鎹� }); @@ -98,11 +113,16 @@ * @param appId * @return */ - public long getWorkstationIdByAppId(String appId) { + /* + public Long getWorkstationIdByAppId(String appId) { Object wid = redisUtil.hget("workstation-appid-map", appId); - String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId)); - + + String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY)); + if(ObjectUtil.isEmpty(workstationId)) { + return null; + } return Long.parseLong(workstationId); } + */ } -- Gitblit v1.9.3