package com.qianwen.core.websocket.distribute;
|
|
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONUtil;
|
import java.util.List;
|
import java.util.Properties;
|
import com.qianwen.core.tool.utils.Func;
|
import com.qianwen.core.websocket.custom.SubscriptionSessionFilter;
|
import com.qianwen.core.websocket.holder.JsonMessageFilterHolder;
|
import com.qianwen.core.websocket.message.JsonWebSocketMessage;
|
import org.springframework.core.GenericTypeResolver;
|
import org.springframework.data.redis.connection.Message;
|
import org.springframework.data.redis.connection.MessageListener;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.serializer.RedisSerializer;
|
|
public class RedisWebsocketMessageListener implements MessageListener, MessageSender {
|
public static final String CHANNEL = "websocket-send";
|
private final StringRedisTemplate stringRedisTemplate;
|
|
public RedisWebsocketMessageListener(final StringRedisTemplate stringRedisTemplate) {
|
this.stringRedisTemplate = stringRedisTemplate;
|
}
|
|
public void onMessage(Message message, byte[] bytes) {
|
byte[] channelBytes = message.getChannel();
|
RedisSerializer<String> stringSerializer = this.stringRedisTemplate.getStringSerializer();
|
String channel = (String) stringSerializer.deserialize(channelBytes);
|
Properties props = System.getProperties();
|
String appName = props.getProperty("spring.application.name");
|
String canChannel = Func.isNotEmpty(appName) ? CHANNEL.concat("-").concat(appName) : CHANNEL;
|
if (canChannel.equals(channel)) {
|
byte[] bodyBytes = message.getBody();
|
String body = (String) stringSerializer.deserialize(bodyBytes);
|
MessageDO messageDO = (MessageDO) JSONUtil.toBean(body, MessageDO.class);
|
sessionFilter(messageDO);
|
doSend(messageDO);
|
}
|
}
|
|
public void sessionFilter(MessageDO messageDO) {
|
SubscriptionSessionFilter filter;
|
if (Func.isEmpty(messageDO.getSessionIds())) {
|
JSONObject jsonObject = JSONUtil.parseObj(messageDO.getMessageText());
|
String type = jsonObject != null ? jsonObject.getStr(JsonWebSocketMessage.TYPE_FIELD) : null;
|
if (type != null && (filter = JsonMessageFilterHolder.getFilter(type)) != null) {
|
GenericTypeResolver.resolveTypeArguments(filter.getClass(), SubscriptionSessionFilter.class);
|
List<String> sessionIds = filter.onResponse(messageDO.getMessageText());
|
messageDO.setSessionIds(sessionIds);
|
}
|
}
|
}
|
}
|