package com.qianwen.smartman.common.config; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import com.alibaba.fastjson.JSONObject; import com.qianwen.core.websocket.distribute.MessageDO; import com.qianwen.core.websocket.distribute.RedisMessageDistributor; import com.qianwen.smartman.common.cache.cps.WorkstationCache; import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage; import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO; import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO; import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData; import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper; import com.qianwen.smartman.modules.mdc.service.RealTimeDataService; import cn.hutool.core.date.DateTime; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONUtil; @Configuration public class MdcMqttConfig { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${mqtt.host}") private String mqttHost; @Value("${mqtt.username:}") private String mqttUserName; @Value("${mqtt.password:}") private String mqttPassword; @Value("${mqtt.isRecevieMsg:false}") private boolean isRecevieMsg; @Autowired private RedisMessageDistributor messageDistributor; //@Autowired //private SuperProcessParameterMapper processParamMapper; @Autowired private RealTimeDataService realTimeDataService; public static final String DEFAULT_TOPIC = "mdc"; /** * 反馈创建的topic(mdc中),本应用接收并处理 */ //public static final String FEEDBACK_TOPIC = "mdc/feedback"; public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create"; public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" if(ObjectUtil.isNotEmpty(mqttUserName)) { options.setUserName(mqttUserName); options.setPassword(mqttPassword.toCharArray()); } factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * * @return */ @Bean public MessageProducer inbound() { java.util.Random r = new java.util.Random(); String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000); MqttPahoMessageDrivenChannelAdapter adapter; if(!this.isRecevieMsg) { adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),DEFAULT_TOPIC); }else { adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数 } adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 入站消息处理 * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) { //实时数据来了,数据从collect发送来 String payload = (String)message.getPayload(); logger.info("接收到mqtt消息readtime,data={}",payload);//消息体如何定义,数据点名称,值,时间,用json对象传过来 //TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem); JSONObject payLoadJson = JSONObject.parseObject(payload); long workstationId = payLoadJson.getLong("workstationId"); String name = payLoadJson.getString("name");//key? long time = payLoadJson.getLong("time"); String v = payLoadJson.getString("value"); //将数据加入缓存 TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time); WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO); //发送websocket消息 RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage(); jsonWebSocketMessage.setId(""+workstationId); //从缓存(或直接查询)加载最新的数据,发送websocket Map map = WorkstationCache.getWorkstationRealTime(workstationId+""); realTimeDataService.addPreTimeInDeviceStatus(workstationId, map); jsonWebSocketMessage.setData(map); MessageDO messageDO = new MessageDO(); messageDO.setNeedBroadcast(false); String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage); logger.info("send_to_page_websockettxt={}",msgtxt); messageDO.setMessageText(msgtxt); messageDistributor.distribute(messageDO); } }; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { Random numList = new Random(); MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound-mdc"+numList.nextInt(50), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(DEFAULT_TOPIC); return messageHandler; } }