From 859d6321b1b1c606de09e9b6a6286aaeace638fe Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 16 十月 2024 21:03:52 +0800 Subject: [PATCH] 接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃 --- collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java | 5 collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java | 63 +++++++ collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java | 59 +++--- collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java | 14 + collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java | 2 collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java | 13 + collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java | 143 +++++++++++++++++ collect/src/main/resources/application-dev.yml | 4 collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 31 ++- collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java | 11 + collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 18 +- collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java | 8 collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java | 42 +++- collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java | 65 ++++++++ 14 files changed, 413 insertions(+), 65 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java index df16c29..21800ca 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java @@ -21,9 +21,11 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.service.DeviceStateFixPointService; import com.qianwen.mdc.collect.service.IOTMqttReceiveService; import com.qianwen.mdc.collect.service.WorkstationAppMappingService; +import com.qianwen.mdc.collect.service.WorkstationDatapointsService; import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; import cn.hutool.core.date.DateTime; @@ -40,19 +42,22 @@ @Value("${mqtt.password:}") private String mqttPassword; - @Autowired private IOTMqttReceiveService recService; @Autowired private DeviceStateFixPointService stateFixPointService; - @Autowired private WorkstationFeedbackService workstationFeedbackService; - @Autowired private WorkstationAppMappingService workstationAppMappingService; - public static final String COLLECT_DATA_TOPIC = "forward/test"; + @Autowired + private WorkstationDatapointsService dpService; + /** + * 鎺ユ敹鏁版嵁鐨刴qtt topic锛屽湪IOT骞冲彴閰嶇疆鐨� + */ + @Value("${mqtt.dataReceiveTopic:}") + public String COLLECT_DATA_TOPIC; /** * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞� @@ -61,7 +66,10 @@ public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; - private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed"; + /** + * 宸ヤ綅鏁版嵁鐐瑰彉鍖� + */ + private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed"; @Bean public MqttPahoClientFactory mqttClientFactory() { @@ -93,7 +101,7 @@ String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, - mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 + mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); @@ -123,10 +131,15 @@ logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId); stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId)); recService.handle((String)message.getPayload()); - }else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) { - String workstationId = (String)message.getPayload(); - logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",workstationId); + }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) { + String payload = (String)message.getPayload(); + logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",payload); workstationAppMappingService.saveToCache(); + //娓呴櫎璇ュ伐浣嶇殑鏁版嵁鐐圭紦瀛� + JSONObject payloadObj = JSONObject.parseObject(payload); + //payloadObj.getLong("workstationId"); + + dpService.datapointsCacheEvict(payloadObj.getString("appId")); } else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉� logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload()); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java index 18d1bc8..1638041 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java @@ -41,7 +41,7 @@ public void testRec2() { //鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d //澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]} - String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}"; + String payload = "{\"182\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}"; //payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}"; recService.handle(payload); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java b/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java index e3ad96e..6d0d3ec 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java @@ -10,6 +10,10 @@ */ public class TelemetryData { private long workstationId; + /** + * iotdb鐨刟ppId + */ + private String appId; private List<TelemetryDataItem> dataItems = new ArrayList<>(); public void addItem(TelemetryDataItem item) { @@ -35,9 +39,16 @@ public void setDataItems(List<TelemetryDataItem> dataItems) { this.dataItems = dataItems; } + + public String getAppId() { + return appId; + } + public void setAppId(String appId) { + this.appId = appId; + } @Override public String toString() { - return "TelemetryData [workstationId=" + workstationId + ", dataItems=" + dataItems + "]"; + return "TelemetryData [workstationId=" + workstationId + ", appId=" + appId + ", dataItems=" + dataItems + "]"; } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java b/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java new file mode 100644 index 0000000..0685661 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java @@ -0,0 +1,65 @@ +package com.qianwen.mdc.collect.entity.mgr; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.qianwen.core.mp.base.BaseEntity; + +import io.swagger.annotations.ApiModelProperty; + + +/** + * 宸ヤ綅閲囬泦鏁版嵁鐐� + */ +@TableName("workstation_datapoints") +public class WorkstationDatapoints extends BaseEntity { + private static final long serialVersionUID = 1; + + @ApiModelProperty("妯℃澘绫诲瀷") + private Integer type; + + @ApiModelProperty("鐐逛綅閰嶇疆(json鏁扮粍)") + private String dpConfig; + + /** + * 宸ヤ綅id + */ + private long workstationId; + + /** + * IOT骞冲彴appId + */ + private String appId; + + public Integer getType() { + return type; + } + + public void setType(Integer type) { + this.type = type; + } + + public String getDpConfig() { + return dpConfig; + } + + public void setDpConfig(String dpConfig) { + this.dpConfig = dpConfig; + } + + public long getWorkstationId() { + return workstationId; + } + + public void setWorkstationId(long workstationId) { + this.workstationId = workstationId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java index bd0c2f2..de40a7b 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java @@ -1,12 +1,16 @@ package com.qianwen.mdc.collect.handler; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.dto.PackedTelemetryData; import com.qianwen.mdc.collect.entity.iotdb.Alarm; import com.qianwen.mdc.collect.service.AlarmService; @@ -20,39 +24,38 @@ @Override public void handleData(PackedTelemetryData data) { - Alarm alarm = new Alarm();//Objects.requireNonNull(BeanUtil.copy(data, Alarm.class)); - alarm.setTime(data.getTime()); - alarm.setWorkstationId(data.getWorkstationId()); - alarm.setCalendarCode(data.getCalendarCode()); - alarm.setFactoryDate(data.getFactoryDate()); - alarm.setFactoryMonth(data.getFactoryMonth()); - alarm.setFactoryWeek(data.getFactoryWeek()); - alarm.setFactoryYear(data.getFactoryYear()); - alarm.setShiftIndex(data.getShiftIndex()); - alarm.setShiftTimeType(data.getShiftTimeType()); - fileAlarmDetail(alarm,data); - alarmService.saveAlarms(Arrays.asList(alarm)); + alarmService.saveAlarms(parseAlarm(data)); - log.info("鎶ヨ鏁版嵁澶勭悊瀹屾垚:鍛婅鏁版嵁{} ",alarm); + log.info("鎶ヨ鏁版嵁澶勭悊瀹屾垚:鍛婅鏁版嵁{} ",data); } - /** - * 濉厖鍛婅鐨勮缁嗕俊鎭紝绾у埆锛屼俊鎭紝浠g爜 - * @param alarm - */ - void fileAlarmDetail(Alarm alarm,PackedTelemetryData data){ - alarm.setCode(this.parseCode(data.getValue())); - alarm.setMessage(data.getValue()); + List<Alarm> parseAlarm(PackedTelemetryData data){ + List<Alarm> alarmList = new ArrayList<>(); + //828d json鏁扮粍鏍煎紡[{"alarmNo":"8084"}] + JSONArray alarmArr = JSONArray.parseArray(data.getValue()); + + for(int i=0;i<alarmArr.size();i++) { + Alarm alarm = new Alarm(); + alarm.setTime(data.getTime()); + alarm.setWorkstationId(data.getWorkstationId()); + alarm.setCalendarCode(data.getCalendarCode()); + alarm.setFactoryDate(data.getFactoryDate()); + alarm.setFactoryMonth(data.getFactoryMonth()); + alarm.setFactoryWeek(data.getFactoryWeek()); + alarm.setFactoryYear(data.getFactoryYear()); + alarm.setShiftIndex(data.getShiftIndex()); + alarm.setShiftTimeType(data.getShiftTimeType()); + + JSONObject alarmJson = alarmArr.getJSONObject(i); + alarm.setCode(alarmJson.getString("alarmNo")); + + alarm.setMessage("");//鏆傛椂娌℃湁锛岄渶瑕佸弽鏌� + + alarmList.add(alarm); + } + return alarmList; } - /** - * 瑙f瀽鎶ヨ浠g爜 - * @param collectText - * @return - */ - String parseCode(String collectText) { - return "0000"; - } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java b/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java new file mode 100644 index 0000000..b7b128e --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java @@ -0,0 +1,11 @@ +package com.qianwen.mdc.collect.mapper.mgr; + +import com.qianwen.core.mp.mapper.BladeMapper; +import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints; + +/** + * 宸ヤ綅鏁版嵁鐐筸apper + */ +public interface WorkstationDatapointsMapper extends BladeMapper<WorkstationDatapoints> { + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java index 4bc3d32..e97e613 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java @@ -30,13 +30,13 @@ private TimeSliceCache timeSliceCache; @Autowired private DeviceStateMapper deviceStateMapper; - @Autowired - private WorkstationAppMappingService mappingService;; + //@Autowired + //private WorkstationAppMappingService mappingService;; @Override public void run(ApplicationArguments args) throws Exception { - mappingService.saveToCache(); + //mappingService.saveToCache(); //鐢熸垚鏃堕棿鍒囩墖 CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).targetDate(LocalDate.now()).build(); @@ -48,7 +48,7 @@ //@RedisLock("posting:lock:initStateFixPoint") public void checkNeedStateFixPoint() { DateTime dateTime = DateTime.now(); - log.info("杩涘叆绋嬪簭鍚姩鏍¢獙鏄惁瀛樺湪宸ヤ綅鎵撹繃鍥哄畾鐐�....... "); + log.info("绋嬪簭鍚姩鏍¢獙鏄惁瀛樺湪宸ヤ綅鎵撹繃鍥哄畾鐐�....... "); Long count = deviceStateMapper.fixPointCountByDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime))); /* diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index 6e86e38..f599308 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -25,6 +25,7 @@ import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; /** * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 @@ -33,7 +34,6 @@ public class CollectDataService { private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); - //private String DB_PREFIX = "root.f2."; private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); @Autowired private IotDBSessionConfig iotdbConfig; @@ -41,6 +41,7 @@ private IotDBCommonService iotDBCommonService; @Autowired private MqttMessageSender mqttMessageSender; + /** * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚� @@ -63,6 +64,7 @@ public void handleCollectData(List<TelemetryData> telemetryDataList) { for (TelemetryData dt : telemetryDataList) { + handleOneWorkstation(dt); sendRealtimeDataMsg(dt); @@ -113,14 +115,14 @@ * @param dt */ void handleOneWorkstation(TelemetryData dt) { - String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); - - List<MeasurementSchema> schemas = new ArrayList<>(); + String deviceId; + //long workstationId = dpVo.getWorkstationId(); + + List<MeasurementSchema> schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); - int rowIndex = 0; @@ -137,6 +139,8 @@ iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); Tablet tablet = new Tablet(deviceId, schemas); for(TypedTelemetryData tdata : typeList) { + + rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, tdata.getTime()); tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); @@ -144,18 +148,16 @@ if(!tdata.getName().equals("Alarm")) { tablet.addValue("v",rowIndex,tdata.getValue()); }else { - //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡鏈猨son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} + //鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue())); } } try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB鍏ュ簱澶辫触",e); - e.printStackTrace(); }finally { //iotdbConfig.getSessionPool().clo1se(); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java index 63e7ee7..3246a2f 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -16,6 +15,7 @@ import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.utils.redis.RedisUtil; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service public class IOTMqttReceiveService { @@ -26,13 +26,14 @@ @Autowired private PackedDataService packedDataService; @Autowired - private RedisUtil redisUtil; + private RedisUtil redisUtil; + @Autowired + private WorkstationDatapointsService dpService; /** * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer * @param payload */ public void handle(String payload) { - //System.out.println("Received message122: " + payload); //瑙f瀽娑堟伅 List<TelemetryData> teleList = parsePayload(payload); @@ -48,22 +49,32 @@ */ List<TelemetryData> parsePayload(String payload){ List<TelemetryData> dtList = new ArrayList<TelemetryData> (); - //鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d - //澶氭潯鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} + //澶氭潯 鏁版嵁鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174鏄簲鐢╥d //瑙f瀽娑堟伅 name,value褰㈠紡锛屽n=output,v=11 JSONObject jsonObj = JSONObject.parseObject(payload); Set<String> keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); + //WorkstationDatapointsVO dpVo; + final String NEWDP_SUFFIX = "_n";//璁$畻瑙勫垯浣跨敤涔嬪悗鏂版暟鎹偣鐨勭粨灏� for(String key : keys) { String appId = key;//iot绯荤粺涓殑搴旂敤id锛屾湰搴旂敤涓簲璇ョ敤琛ㄥ幓瀵瑰簲 - long workstationId = getWorkstationIdByAppId(appId); + + + //TODO 鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃�,鍙繚瀛橀厤缃ソ鐨勬暟鎹偣锛屾病鏈夐厤缃殑閲囬泦鏁版嵁鎶涘純銆� + final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); + if(dpVo == null) { + //宸ヤ綅娌℃湁瀹氫箟杩囨暟鎹偣鎴栬�卆ppId涓嶅尮閰� + log.warn("appId={}鏈壘鍒版暟鎹偣瀹氫箟璁板綍锛屼涪寮冩暟鎹�",appId); + continue; + } TelemetryData tdata = new TelemetryData(); - tdata.setWorkstationId(workstationId); - + //tdata.setWorkstationId(workstationId); + tdata.setAppId(appId); + tdata.setWorkstationId(dpVo.getWorkstationId()); JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i<dtArr.size();i++) { @@ -81,6 +92,10 @@ if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); } + if(!dpVo.containsDataPoint(oriValueKey)) { + //濡傛灉涓嶅瓨鍦ㄨ鏁版嵁鐐归厤缃紝璇ユ暟鎹洿鎺ュ拷鐣� + return; + } tdataItem.addPoint(oriValueKey,values.getString(valueKey));//浣跨敤鍘熷閰嶇疆鐐逛繚鎸佷繚瀛樻暟鎹� }); @@ -98,11 +113,16 @@ * @param appId * @return */ - public long getWorkstationIdByAppId(String appId) { + /* + public Long getWorkstationIdByAppId(String appId) { Object wid = redisUtil.hget("workstation-appid-map", appId); - String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId)); - + + String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY)); + if(ObjectUtil.isEmpty(workstationId)) { + return null; + } return Long.parseLong(workstationId); } + */ } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java index 99392cd..2e5d115 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java @@ -47,6 +47,8 @@ private WorkstationCache workstationCache; @Autowired private TimeSliceCache timeSliceCache; + @Autowired + private WorkstationDatapointsService dpService; static { PROCESS_PARAM_MAP.put(1, "STATE"); @@ -91,12 +93,13 @@ String[] keys = map.keySet().toArray(new String[0]); for (int j = 0; j < keys.length; j++) { //TODO: 杩欓噷锛屽師绯荤粺杩涜浜嗚繃婊ゃ�俉orkstationCollectDataServiceImpl.handlerWorkstationCollectData閲� + PackedTelemetryData pkData = new PackedTelemetryData(); pkData.setWorkstationId(tdata.getWorkstationId()); pkData.setValue(map.get(keys[j])); pkData.setTime(item.getTime()); - pkData.setName(keys[j]);//鍙傛暟鍚嶇О + pkData.setName(keys[j]);//鏁版嵁鐐瑰悕绉� fillByCalendar(pkData); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java new file mode 100644 index 0000000..7762b5b --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java @@ -0,0 +1,63 @@ +package com.qianwen.mdc.collect.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.qianwen.core.mp.base.BaseServiceImpl; +import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints; +import com.qianwen.mdc.collect.mapper.mgr.WorkstationDatapointsMapper; +import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; +/** + * 宸ヤ綅鏁版嵁鐐规湇鍔★紝涓昏鐢ㄤ簬鑾峰彇鏁版嵁鐐瑰畾涔夈�傚畾涔夊湪smartman搴旂敤 + */ +@Service +public class WorkstationDatapointsService extends BaseServiceImpl<WorkstationDatapointsMapper, WorkstationDatapoints> { + private Logger log = LoggerFactory.getLogger(this.getClass()); + + + /** + * 鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃� ,浠庣紦瀛� + * @param workstationId + * @return + */ + @Cacheable(value = "collect:datapoint" ,key = "#appId") + public WorkstationDatapointsVO getDatapointsByAppIdFromCache(String appId) { + return this.getDataPointByAppId(appId); + } + + /* + @CachePut(value = "collect:datapoint" ,key = "#appId") + public WorkstationDatapointsVO datapointsCachePut(String appId) { + return this.getDataPointByAppId(appId); + } + */ + + @CacheEvict(value = "collect:datapoint" ,key = "#appId") + public void datapointsCacheEvict(String appId) { + + } + /** + * 浠庢暟鎹簱鏌ヨ鏁版嵁鐐瑰畾涔� + * @param workstationId 宸ヤ綅id + * @return + */ + WorkstationDatapointsVO getDataPointByAppId(String appId) { + log.info("appid={}", appId); + WorkstationDatapoints dp = baseMapper.selectOne(Wrappers.<WorkstationDatapoints>lambdaQuery() + .eq(WorkstationDatapoints::getAppId, appId)); + + log.info("dp={}", dp); + + WorkstationDatapointsVO dpVO = null; + + if (dp != null) { + dpVO = new WorkstationDatapointsVO(dp.getWorkstationId(), dp.getAppId(), dp.getDpConfig()); + } + return dpVO; + } + +} diff --git a/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java b/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java new file mode 100644 index 0000000..7404419 --- /dev/null +++ b/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java @@ -0,0 +1,143 @@ +package com.qianwen.mdc.collect.vo; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.json.JSONUtil; +import io.swagger.annotations.ApiModelProperty; + + +/** + * 宸ヤ綅閲囬泦鏁版嵁鐐筕O + */ +public class WorkstationDatapointsVO implements Serializable{ + + /** + * 搴忓垪鍖栵紝搴斾负闇�瑕乻pring缂撳瓨 + */ + private static final long serialVersionUID = 6558493027948435061L; + + @ApiModelProperty("鐐逛綅琛ㄥご(json鏁扮粍)") + private String dpHead; + + @ApiModelProperty("鐐逛綅閰嶇疆(json鏁扮粍)") + private String dpConfig; + + /** + * 宸ヤ綅id + */ + private long workstationId; + + /** + * IOT骞冲彴appId + */ + private String appId; + + //private List<DataPoint> points = null; + private List<String> points = new ArrayList<>(); + + public WorkstationDatapointsVO(long workstationId, String appId,String dpConfig) { + super(); + this.dpConfig = dpConfig; + this.workstationId = workstationId; + this.appId = appId; + + initPoints(); + } + void initPoints() { + if(ObjectUtil.isEmpty(dpConfig)) { + return; + } + + JSONArray ptArr = JSONArray.parseArray(dpConfig); + + points = new ArrayList<>(); + JSONObject ptObj; + for(int i=0;i<ptArr.size();i++) { + ptObj = ptArr.getJSONObject(i); + + //DataPoint dp = new DataPoint(); + //dp.setDpName(ptObj.getString("dpName")); + + points.add(ptObj.getString("dpName")); + } + } + + public String getDpConfig() { + return dpConfig; + } + + public void setDpConfig(String dpConfig) { + this.dpConfig = dpConfig; + } + + public long getWorkstationId() { + return workstationId; + } + + public void setWorkstationId(long workstationId) { + this.workstationId = workstationId; + } + + public String getDpHead() { + return dpHead; + } + + public void setDpHead(String dpHead) { + this.dpHead = dpHead; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + + /** + * 鍒ゆ柇閲囬泦鍙橀噺鍚嶇О鏄惁鍦ㄦ暟鎹偣閰嶇疆绉� + * @param dpName + * @return + */ + public boolean containsDataPoint(String dpName) { + if(ObjectUtil.isEmpty(points)) { + return false; + } + + return points.contains(dpName); + /* + for(String dpn : points) { + if(StringUtils.equals(dpn, dpName)) { + return true; + } + } + return false; + */ + } + + +} +/* +class DataPoint{ + private String dpName; + + public String getDpName() { + return dpName; + } + + public void setDpName(String dpName) { + this.dpName = dpName; + } + + +} +*/ diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml index dda4930..afd9ca6 100644 --- a/collect/src/main/resources/application-dev.yml +++ b/collect/src/main/resources/application-dev.yml @@ -23,7 +23,7 @@ #蹇冭烦鏃堕棿 keepalive: 10 connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿 - + dataReceiveTopic: forward/test #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic # mysql datasource: type: mysql @@ -35,7 +35,7 @@ #iotdb 浠ュ強鍏秊dbc涓�璧烽厤缃� iotdb: driver: org.apache.iotdb.jdbc.IoTDBDriver - host: 120.46.212.231 + host: localhost #120.46.212.231 port: 6667 maxSize: 100 username: root diff --git a/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java b/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java new file mode 100644 index 0000000..cd44a41 --- /dev/null +++ b/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java @@ -0,0 +1,14 @@ +package com.qianwen.mdc.collect.service; + +import static org.junit.Assert.assertNull; + +import org.junit.jupiter.api.Test; + +public class IOTMqttReceiveServiceTest { + + @Test + public void t() { + + //System.out.println(a == null); + } +} -- Gitblit v1.9.3