package com.qianwen.smartman.common.config;
|
|
import java.util.Arrays;
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.Random;
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.qianwen.core.websocket.distribute.MessageDO;
|
import com.qianwen.core.websocket.distribute.RedisMessageDistributor;
|
import com.qianwen.smartman.common.cache.cps.WorkstationCache;
|
import com.qianwen.smartman.common.websocket.realtime.RealTimeDaraResponseJsonWebSocketMessage;
|
import com.qianwen.smartman.modules.smis.message.dto.TelemetryDataResponseDTO;
|
import com.qianwen.smartman.modules.mdc.dto.ProcessParameterVO;
|
import com.qianwen.smartman.modules.mdc.entity.WorkstationCollectData;
|
import com.qianwen.smartman.modules.mdc.mapper.SuperProcessParameterMapper;
|
import com.qianwen.smartman.modules.mdc.service.RealTimeDataService;
|
|
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.json.JSONUtil;
|
|
@Configuration
|
public class MdcMqttConfig {
|
private Logger logger = LoggerFactory.getLogger(this.getClass());
|
@Value("${mqtt.host}")
|
private String mqttHost;
|
|
@Value("${mqtt.username:}")
|
private String mqttUserName;
|
|
@Value("${mqtt.password:}")
|
private String mqttPassword;
|
@Value("${mqtt.isRecevieMsg:false}")
|
private boolean isRecevieMsg;
|
@Autowired
|
private RedisMessageDistributor messageDistributor;
|
|
//@Autowired
|
//private SuperProcessParameterMapper processParamMapper;
|
|
@Autowired
|
private RealTimeDataService realTimeDataService;
|
public static final String DEFAULT_TOPIC = "mdc";
|
/**
|
* 反馈创建的topic(mdc中),本应用接收并处理
|
*/
|
//public static final String FEEDBACK_TOPIC = "mdc/feedback";
|
|
public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
|
|
public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
|
@Bean
|
public MqttPahoClientFactory mqttClientFactory() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884"
|
|
if(ObjectUtil.isNotEmpty(mqttUserName)) {
|
options.setUserName(mqttUserName);
|
options.setPassword(mqttPassword.toCharArray());
|
}
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
/**
|
*
|
* @return
|
*/
|
|
@Bean
|
public MessageProducer inbound() {
|
java.util.Random r = new java.util.Random();
|
|
String clientId = "spring-boot-mdc-mqtt-client-inbound"+r.nextInt(1000);
|
|
MqttPahoMessageDrivenChannelAdapter adapter;
|
if(!this.isRecevieMsg) {
|
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
|
mqttClientFactory(),DEFAULT_TOPIC);
|
}else {
|
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
|
mqttClientFactory(),WOCKSTATION_REALTIMEDATA_TOPIC);//最后一个参数允许多个topic参数
|
}
|
|
adapter.setCompletionTimeout(5000);
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setQos(1);
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
|
/**
|
* 入站消息处理
|
* @return
|
*/
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
public MessageHandler handler() {
|
|
return message -> {
|
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
|
if(WOCKSTATION_REALTIMEDATA_TOPIC.equals(topic) ) {
|
//实时数据来了,数据从collect发送来
|
String payload = (String)message.getPayload();
|
logger.info("接收到mqtt消息readtime,data={}",payload);//消息体如何定义,数据点名称,值,时间,用json对象传过来
|
//TelemetryDataMessage result = TelemetryDataUtils.handleTelemetryDataMessage(telemetryDataMessage, workStationItem);
|
JSONObject payLoadJson = JSONObject.parseObject(payload);
|
long workstationId = payLoadJson.getLong("workstationId");
|
String name = payLoadJson.getString("name");//key?
|
long time = payLoadJson.getLong("time");
|
String v = payLoadJson.getString("value");
|
|
//将数据加入缓存
|
TelemetryDataResponseDTO telemetryDataResponseDTO = new TelemetryDataResponseDTO(v, time);
|
WorkstationCache.setWorkstationRealTime(workstationId, name, telemetryDataResponseDTO);
|
|
//发送websocket消息
|
RealTimeDaraResponseJsonWebSocketMessage jsonWebSocketMessage = new RealTimeDaraResponseJsonWebSocketMessage();
|
jsonWebSocketMessage.setId(""+workstationId);
|
|
|
//从缓存(或直接查询)加载最新的数据,发送websocket
|
Map<String, Object> map = WorkstationCache.getWorkstationRealTime(workstationId+"");
|
|
realTimeDataService.addPreTimeInDeviceStatus(workstationId, map);
|
|
jsonWebSocketMessage.setData(map);
|
MessageDO messageDO = new MessageDO();
|
messageDO.setNeedBroadcast(false);
|
String msgtxt =JSONUtil.toJsonStr(jsonWebSocketMessage);
|
logger.info("send_to_page_websockettxt={}",msgtxt);
|
messageDO.setMessageText(msgtxt);
|
|
messageDistributor.distribute(messageDO);
|
}
|
};
|
}
|
|
@Bean
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
public MessageHandler mqttOutbound() {
|
Random numList = new Random();
|
|
MqttPahoMessageHandler messageHandler =
|
new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound-mdc"+numList.nextInt(50), mqttClientFactory());
|
messageHandler.setAsync(true);
|
messageHandler.setDefaultTopic(DEFAULT_TOPIC);
|
return messageHandler;
|
}
|
}
|