| | |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.qianwen.mdc.collect.service.DeviceStateFixPointService; |
| | | import com.qianwen.mdc.collect.service.IOTMqttReceiveService; |
| | | import com.qianwen.mdc.collect.service.WorkstationAppMappingService; |
| | | import com.qianwen.mdc.collect.service.WorkstationDatapointsService; |
| | | import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; |
| | | |
| | | import cn.hutool.core.date.DateTime; |
| | |
| | | |
| | | @Value("${mqtt.password:}") |
| | | private String mqttPassword; |
| | | |
| | | @Autowired |
| | | private IOTMqttReceiveService recService; |
| | | @Autowired |
| | | private DeviceStateFixPointService stateFixPointService; |
| | | |
| | | @Autowired |
| | | private WorkstationFeedbackService workstationFeedbackService; |
| | | |
| | | @Autowired |
| | | private WorkstationAppMappingService workstationAppMappingService; |
| | | |
| | | public static final String COLLECT_DATA_TOPIC = "forward/test"; |
| | | @Autowired |
| | | private WorkstationDatapointsService dpService; |
| | | /** |
| | | * 接收数据的mqtt topic,在IOT平台配置的 |
| | | */ |
| | | @Value("${mqtt.dataReceiveTopic:}") |
| | | public String COLLECT_DATA_TOPIC; |
| | | |
| | | /** |
| | | * 反馈创建的topic(mdc中),本应用接收并处理 |
| | |
| | | |
| | | public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; |
| | | |
| | | private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed"; |
| | | /** |
| | | * 工位数据点变化 |
| | | */ |
| | | private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed"; |
| | | |
| | | @Bean |
| | | public MqttPahoClientFactory mqttClientFactory() { |
| | |
| | | String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000); |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//最后一个参数允许多个topic参数 |
| | | mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//最后一个参数允许多个topic参数 |
| | | adapter.setCompletionTimeout(5000); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(1); |
| | |
| | | logger.info("工位创建接收消息={}",workstationId); |
| | | stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId)); |
| | | recService.handle((String)message.getPayload()); |
| | | }else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) { |
| | | String workstationId = (String)message.getPayload(); |
| | | logger.info("工位appId映射变化消息={}",workstationId); |
| | | }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) { |
| | | String payload = (String)message.getPayload(); |
| | | logger.info("工位appId映射变化消息={}",payload); |
| | | workstationAppMappingService.saveToCache(); |
| | | //清除该工位的数据点缓存 |
| | | JSONObject payloadObj = JSONObject.parseObject(payload); |
| | | //payloadObj.getLong("workstationId"); |
| | | |
| | | dpService.datapointsCacheEvict(payloadObj.getString("appId")); |
| | | } else {//订阅了几个topic就会接收到几个,其他的不会进来 |
| | | logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload()); |
| | | } |