| | |
| | | package com.qianwen.smartman.common.config; |
| | | |
| | | import java.util.Arrays; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.qianwen.core.websocket.distribute.MessageDO; |
| | | import com.qianwen.core.websocket.distribute.RedisMessageDistributor; |
| | | import com.qianwen.smartman.common.cache.cps.WorkstationCache; |
| | | import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage; |
| | | import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO; |
| | | import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO; |
| | | import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData; |
| | | import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper; |
| | | import com.qianwen.smartman.modules.mdc.service.RealTimeDataService; |
| | | |
| | | import cn.hutool.core.date.DateTime; |
| | | import cn.hutool.core.util.ObjectUtil; |
| | | import cn.hutool.json.JSONUtil; |
| | | |
| | | @Configuration |
| | | public class MdcMqttConfig { |
| | |
| | | |
| | | @Value("${mqtt.password:}") |
| | | private String mqttPassword; |
| | | |
| | | @Value("${mqtt.isRecevieMsg:false}") |
| | | private boolean isRecevieMsg; |
| | | @Autowired |
| | | private RedisMessageDistributor messageDistributor; |
| | | |
| | | //@Autowired |
| | | //private SuperProcessParameterMapper processParamMapper; |
| | | |
| | | @Autowired |
| | | private RealTimeDataService realTimeDataService; |
| | | public static final String DEFAULT_TOPIC = "mdc"; |
| | | /** |
| | | * 反馈创建的topic(mdc中),本应用接收并处理 |
| | |
| | | |
| | | public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; |
| | | |
| | | public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; |
| | | @Bean |
| | | public MqttPahoClientFactory mqttClientFactory() { |
| | | DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
| | |
| | | * |
| | | * @return |
| | | */ |
| | | /* |
| | | |
| | | @Bean |
| | | public MessageProducer inbound() { |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound", |
| | | mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//最后一个参数允许多个topic参数 |
| | | java.util.Random r = new java.util.Random(); |
| | | |
| | | String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000); |
| | | |
| | | MqttPahoMessageDrivenChannelAdapter adapter; |
| | | if(!this.isRecevieMsg) { |
| | | adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(),DEFAULT_TOPIC); |
| | | }else { |
| | | adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, |
| | | mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数 |
| | | } |
| | | |
| | | adapter.setCompletionTimeout(5000); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(1); |
| | | adapter.setOutputChannel(mqttInputChannel()); |
| | | return adapter; |
| | | } |
| | | */ |
| | | |
| | | |
| | | /** |
| | | * 入站消息处理 |
| | |
| | | @Bean |
| | | @ServiceActivator(inputChannel = "mqttInputChannel") |
| | | public MessageHandler handler() { |
| | | |
| | | return message -> { |
| | | String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); |
| | | if(FEEDBACK_TOPIC.equals(topic) ) { |
| | | |
| | | if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) { |
| | | //实时数据来了,数据从collect发送来 |
| | | String payload = (String)message.getPayload(); |
| | | logger.info("接收到mqtt消息readtime,data={}",payload);//消息体如何定义,数据点名称,值,时间,用json对象传过来 |
| | | //TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem); |
| | | JSONObject payLoadJson = JSONObject.parseObject(payload); |
| | | long workstationId = payLoadJson.getLong("workstationId"); |
| | | String name = payLoadJson.getString("name");//key? |
| | | long time = payLoadJson.getLong("time"); |
| | | String v = payLoadJson.getString("value"); |
| | | |
| | | //将数据加入缓存 |
| | | TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time); |
| | | WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO); |
| | | |
| | | //发送websocket消息 |
| | | RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage(); |
| | | jsonWebSocketMessage.setId(""+workstationId); |
| | | |
| | | |
| | | //从缓存(或直接查询)加载最新的数据,发送websocket |
| | | Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+""); |
| | | |
| | | realTimeDataService.addPreTimeInDeviceStatus(workstationId, map); |
| | | |
| | | jsonWebSocketMessage.setData(map); |
| | | MessageDO messageDO = new MessageDO(); |
| | | messageDO.setNeedBroadcast(false); |
| | | String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage); |
| | | logger.info("websockettxt={}",msgtxt); |
| | | messageDO.setMessageText(msgtxt); |
| | | |
| | | messageDistributor.distribute(messageDO); |
| | | } |
| | | }; |
| | | } |