From 859d6321b1b1c606de09e9b6a6286aaeace638fe Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 16 十月 2024 21:03:52 +0800 Subject: [PATCH] 接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃 --- collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 31 ++++++++++++++++++++++--------- 1 files changed, 22 insertions(+), 9 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java index df16c29..21800ca 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java @@ -21,9 +21,11 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.service.DeviceStateFixPointService; import com.qianwen.mdc.collect.service.IOTMqttReceiveService; import com.qianwen.mdc.collect.service.WorkstationAppMappingService; +import com.qianwen.mdc.collect.service.WorkstationDatapointsService; import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; import cn.hutool.core.date.DateTime; @@ -40,19 +42,22 @@ @Value("${mqtt.password:}") private String mqttPassword; - @Autowired private IOTMqttReceiveService recService; @Autowired private DeviceStateFixPointService stateFixPointService; - @Autowired private WorkstationFeedbackService workstationFeedbackService; - @Autowired private WorkstationAppMappingService workstationAppMappingService; - public static final String COLLECT_DATA_TOPIC = "forward/test"; + @Autowired + private WorkstationDatapointsService dpService; + /** + * 鎺ユ敹鏁版嵁鐨刴qtt topic锛屽湪IOT骞冲彴閰嶇疆鐨� + */ + @Value("${mqtt.dataReceiveTopic:}") + public String COLLECT_DATA_TOPIC; /** * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞� @@ -61,7 +66,10 @@ public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; - private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed"; + /** + * 宸ヤ綅鏁版嵁鐐瑰彉鍖� + */ + private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed"; @Bean public MqttPahoClientFactory mqttClientFactory() { @@ -93,7 +101,7 @@ String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, - mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 + mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); @@ -123,10 +131,15 @@ logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId); stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId)); recService.handle((String)message.getPayload()); - }else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) { - String workstationId = (String)message.getPayload(); - logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",workstationId); + }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) { + String payload = (String)message.getPayload(); + logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",payload); workstationAppMappingService.saveToCache(); + //娓呴櫎璇ュ伐浣嶇殑鏁版嵁鐐圭紦瀛� + JSONObject payloadObj = JSONObject.parseObject(payload); + //payloadObj.getLong("workstationId"); + + dpService.datapointsCacheEvict(payloadObj.getString("appId")); } else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉� logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload()); } -- Gitblit v1.9.3