| | |
| | | import com.qianwen.core.websocket.distribute.RedisMessageDistributor; |
| | | import com.qianwen.smartman.common.cache.cps.WorkstationCache; |
| | | import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage; |
| | | import com.qianwen.smartman.modules.cps.message.dto.TelemetryDataResponseDTO; |
| | | import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO; |
| | | import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO; |
| | | import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData; |
| | | import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper; |
| | |
| | | |
| | | @Value("${mqtt.password:}") |
| | | private String mqttPassword; |
| | | |
| | | @Value("${mqtt.isRecevieMsg:false}") |
| | | private boolean isRecevieMsg; |
| | | @Autowired |
| | | private RedisMessageDistributor messageDistributor; |
| | | |
| | |
| | | |
| | | String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000); |
| | | |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数 |
| | | MqttPahoMessageDrivenChannelAdapter adapter; |
| | | if(!this.isRecevieMsg) { |
| | | adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(),DEFAULT_TOPIC); |
| | | }else { |
| | | adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数 |
| | | } |
| | | |
| | | adapter.setCompletionTimeout(5000); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(1); |
| | |
| | | @Bean |
| | | @ServiceActivator(inputChannel = "mqttInputChannel") |
| | | public MessageHandler handler() { |
| | | |
| | | return message -> { |
| | | String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); |
| | | if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) { |
| | |
| | | Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+""); |
| | | |
| | | realTimeDataService.addPreTimeInDeviceStatus(workstationId, map); |
| | | /* |
| | | //找到状态的数据,加入上一个状态的 |
| | | final String statusDpName = "DeviceStatus"; |
| | | if(map.containsKey(statusDpName)) { |
| | | TelemetryDataResponseDTO statusDTO = (TelemetryDataResponseDTO) map.get(statusDpName); |
| | | JSONObject statusJson = new JSONObject(); |
| | | statusJson.put("t", statusDTO.getT()); |
| | | statusJson.put("v", statusDTO.getV()); |
| | | |
| | | long preT = statusDTO.getT(); |
| | | //最后一条不同值(v)的数据 |
| | | ProcessParameterVO diffStatusVO = processParamMapper.lastParameterNotEqValue(workstationId, statusDpName, statusDTO.getV()); |
| | | |
| | | ProcessParameterVO tempStatusVO; |
| | | if(diffStatusVO != null) { |
| | | tempStatusVO = processParamMapper.firstParameterEqValueGtTime(workstationId, statusDpName, statusDTO.getV(), diffStatusVO.getTime().getTime()); |
| | | }else { |
| | | tempStatusVO = processParamMapper.firstParameterEqValue(workstationId, statusDpName, statusDTO.getV()); |
| | | } |
| | | if(tempStatusVO != null) { |
| | | preT = tempStatusVO.getTime().getTime(); |
| | | } |
| | | statusJson.put("preT", preT); |
| | | |
| | | map.put(statusDpName, statusJson);//覆盖原来的DeviceStatus |
| | | } |
| | | */ |
| | | |
| | | jsonWebSocketMessage.setData(map); |
| | | MessageDO messageDO = new MessageDO(); |
| | |
| | | messageDO.setMessageText(msgtxt); |
| | | |
| | | messageDistributor.distribute(messageDO); |
| | | |
| | | } |
| | | }; |
| | | } |