yangys
2024-05-18 cc0bdfb33ef638dfafe3185c92c7076d815e1c9b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.qianwen.core.websocket.distribute;
 
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.util.List;
import java.util.Properties;
import com.qianwen.core.tool.utils.Func;
import com.qianwen.core.websocket.custom.SubscriptionSessionFilter;
import com.qianwen.core.websocket.holder.JsonMessageFilterHolder;
import com.qianwen.core.websocket.message.JsonWebSocketMessage;
import org.springframework.core.GenericTypeResolver;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
 
public class RedisWebsocketMessageListener implements MessageListener, MessageSender {
    public static final String CHANNEL = "websocket-send";
    private final StringRedisTemplate stringRedisTemplate;
 
    public RedisWebsocketMessageListener(final StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
 
    public void onMessage(Message message, byte[] bytes) {
        byte[] channelBytes = message.getChannel();
        RedisSerializer<String> stringSerializer = this.stringRedisTemplate.getStringSerializer();
        String channel = (String) stringSerializer.deserialize(channelBytes);
        Properties props = System.getProperties();
        String appName = props.getProperty("spring.application.name");
        String canChannel = Func.isNotEmpty(appName) ? CHANNEL.concat("-").concat(appName) : CHANNEL;
        if (canChannel.equals(channel)) {
            byte[] bodyBytes = message.getBody();
            String body = (String) stringSerializer.deserialize(bodyBytes);
            MessageDO messageDO = (MessageDO) JSONUtil.toBean(body, MessageDO.class);
            sessionFilter(messageDO);
            doSend(messageDO);
        }
    }
 
    public void sessionFilter(MessageDO messageDO) {
        SubscriptionSessionFilter filter;
        if (Func.isEmpty(messageDO.getSessionIds())) {
            JSONObject jsonObject = JSONUtil.parseObj(messageDO.getMessageText());
            String type = jsonObject != null ? jsonObject.getStr(JsonWebSocketMessage.TYPE_FIELD) : null;
            if (type != null && (filter = JsonMessageFilterHolder.getFilter(type)) != null) {
                GenericTypeResolver.resolveTypeArguments(filter.getClass(), SubscriptionSessionFilter.class);
                List<String> sessionIds = filter.onResponse(messageDO.getMessageText());
                messageDO.setSessionIds(sessionIds);
            }
        }
    }
}