yangys
2024-05-07 ec2552d891e163bd9054e0554188afc575bcdb70
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.qianwen.smartman.modules.sync.message.sse;
 
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.qianwen.smartman.modules.sync.enums.SseEventEnum;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
 * 服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端
 */
@Service
public class SseEmitterServer {
    private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class);
    private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
 
    public SseEmitter createSseConnect(String clientId) {
        if (StrUtil.isEmpty(clientId)) {
            clientId = IdUtil.simpleUUID();
        }
        try {
            SseEmitter sseEmitter = new SseEmitter(0L);
            sseEmitter.onCompletion(onCompletionRunnable(clientId));
            sseEmitter.onTimeout(onTimeoutRunnable(clientId));
            sseEmitter.onError(errorCallBack(clientId));
            SSE_CACHE.put(clientId, sseEmitter);
            log.info("[SSE服务端].[创建新的sse连接,当前客户端:{}]", clientId);
            sseEmitter.send(SseEmitter.event().id(SseEventEnum.CREATE_SSE_ID.getEventId()).data(clientId));
            return sseEmitter;
        } catch (IOException e) {
            log.error("[SSE服务端].[createSseConnect 创建长连接异常,客户端id:{}]", clientId, e);
            return null;
        } catch (AsyncRequestTimeoutException e2) {
            log.info("[SSE服务端].[createSseConnect 连接超时,关闭连接 errMsg={}]", e2.getMessage());
            return null;
        }
    }
 
    public void close(String clientId) {
        SseEmitter sseEmitter = getSseEmitter(clientId);
        if (sseEmitter != null) {
            sseEmitter.complete();
            removeClient(clientId);
        }
        log.info("[SSE服务端].[close 关闭连接 clientId={}]", clientId);
    }
 
    public SseEmitter getSseEmitter(String clientId) {
        return SSE_CACHE.get(clientId);
    }
 
    public void sendMsg(String clientId, Object data, String eventId) {
        SseEmitter sseEmitter = getSseEmitter(clientId);
        if (sseEmitter == null) {
            log.error("[SSE服务端].[推送消息失败:客户端{}未创建连接]", clientId);
            return;
        }
        SseEmitter.SseEventBuilder builderData = SseEmitter.event().id(eventId).data(data);
        try {
            sseEmitter.send(builderData);
        } catch (IOException e) {
            log.error("[SSE服务端].[推送消息失败]", e);
        }
    }
 
    public void sendAll(Object data, String eventId) {
        SSE_CACHE.forEach((k, v) -> {
            sendMsg(k, data, eventId);
        });
    }
 
    private Runnable onCompletionRunnable(String clientId) {
        return () -> {
            log.info("结束连接:{}", clientId);
            close(clientId);
        };
    }
 
    private Runnable onTimeoutRunnable(String clientId) {
        return () -> {
            log.info("连接超时:{}", clientId);
            close(clientId);
        };
    }
 
    private Consumer<Throwable> errorCallBack(String clientId) {
        return throwable -> {
            log.info("连接异常:{}", clientId);
            close(clientId);
        };
    }
 
    private void removeClient(String clientId) {
        SSE_CACHE.remove(clientId);
        log.info("[SSE服务端].[SseEmitterServer 清除客户端:{}]", clientId);
    }
}