package com.qianwen.core.websocket.distribute; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import java.util.List; import java.util.Properties; import com.qianwen.core.tool.utils.Func; import com.qianwen.core.websocket.custom.SubscriptionSessionFilter; import com.qianwen.core.websocket.holder.JsonMessageFilterHolder; import com.qianwen.core.websocket.message.JsonWebSocketMessage; import org.springframework.core.GenericTypeResolver; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; public class RedisWebsocketMessageListener implements MessageListener, MessageSender { public static final String CHANNEL = "websocket-send"; private final StringRedisTemplate stringRedisTemplate; public RedisWebsocketMessageListener(final StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public void onMessage(Message message, byte[] bytes) { byte[] channelBytes = message.getChannel(); RedisSerializer stringSerializer = this.stringRedisTemplate.getStringSerializer(); String channel = (String) stringSerializer.deserialize(channelBytes); Properties props = System.getProperties(); String appName = props.getProperty("spring.application.name"); String canChannel = Func.isNotEmpty(appName) ? CHANNEL.concat("-").concat(appName) : CHANNEL; if (canChannel.equals(channel)) { byte[] bodyBytes = message.getBody(); String body = (String) stringSerializer.deserialize(bodyBytes); MessageDO messageDO = (MessageDO) JSONUtil.toBean(body, MessageDO.class); sessionFilter(messageDO); doSend(messageDO); } } public void sessionFilter(MessageDO messageDO) { SubscriptionSessionFilter filter; if (Func.isEmpty(messageDO.getSessionIds())) { JSONObject jsonObject = JSONUtil.parseObj(messageDO.getMessageText()); String type = jsonObject != null ? jsonObject.getStr(JsonWebSocketMessage.TYPE_FIELD) : null; if (type != null && (filter = JsonMessageFilterHolder.getFilter(type)) != null) { GenericTypeResolver.resolveTypeArguments(filter.getClass(), SubscriptionSessionFilter.class); List sessionIds = filter.onResponse(messageDO.getMessageText()); messageDO.setSessionIds(sessionIds); } } } }