From aa15132c1992c56b679dcafafae9a1b1aa960830 Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期一, 28 十月 2024 16:17:42 +0800 Subject: [PATCH] 增加是否处理mqtt消息的配置;过程参数更改参数 --- smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java | 44 ++++++++++++-------------------------------- 1 files changed, 12 insertions(+), 32 deletions(-) diff --git a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java index acc2efa..c2681ee 100644 --- a/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java +++ b/smart-man-boot/src/main/java/com/qianwen/smartman/common/config/MdcMqttConfig.java @@ -50,7 +50,8 @@ @Value("${mqtt.password:}") private String mqttPassword; - + @Value("${mqtt.isRecevieMsg:false}") + private boolean isRecevieMsg; @Autowired private RedisMessageDistributor messageDistributor; @@ -98,9 +99,15 @@ String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000); - MqttPahoMessageDrivenChannelAdapter adapter = - new MqttPahoMessageDrivenChannelAdapter(clientId, - mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 + MqttPahoMessageDrivenChannelAdapter adapter; + if(!this.isRecevieMsg) { + adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, + mqttClientFactory(),DEFAULT_TOPIC); + }else { + adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, + mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 + } + adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); @@ -116,6 +123,7 @@ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { + return message -> { String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) { @@ -142,33 +150,6 @@ Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+""); realTimeDataService.addPreTimeInDeviceStatus(workstationId, map); - /* - //鎵惧埌鐘舵�佺殑鏁版嵁锛屽姞鍏ヤ笂涓�涓姸鎬佺殑 - final String statusDpName = "DeviceStatus"; - if(map.containsKey(statusDpName)) { - TelemetryDataResponseDTO statusDTO = (TelemetryDataResponseDTO) map.get(statusDpName); - JSONObject statusJson = new JSONObject(); - statusJson.put("t", statusDTO.getT()); - statusJson.put("v", statusDTO.getV()); - - long preT = statusDTO.getT(); - //鏈�鍚庝竴鏉′笉鍚屽��(v)鐨勬暟鎹� - ProcessParameterVO diffStatusVO = processParamMapper.lastParameterNotEqValue(workstationId, statusDpName, statusDTO.getV()); - - ProcessParameterVO tempStatusVO; - if(diffStatusVO != null) { - tempStatusVO = processParamMapper.firstParameterEqValueGtTime(workstationId, statusDpName, statusDTO.getV(), diffStatusVO.getTime().getTime()); - }else { - tempStatusVO = processParamMapper.firstParameterEqValue(workstationId, statusDpName, statusDTO.getV()); - } - if(tempStatusVO != null) { - preT = tempStatusVO.getTime().getTime(); - } - statusJson.put("preT", preT); - - map.put(statusDpName, statusJson);//瑕嗙洊鍘熸潵鐨凞eviceStatus - } - */ jsonWebSocketMessage.setData(map); MessageDO messageDO = new MessageDO(); @@ -178,7 +159,6 @@ messageDO.setMessageText(msgtxt); messageDistributor.distribute(messageDO); - } }; } -- Gitblit v1.9.3