package com.qianwen.smartman.modules.sync.message.sse; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.qianwen.smartman.modules.sync.enums.SseEventEnum; import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; /** * 服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端 */ @Service public class SseEmitterServer { private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class); private static final Map SSE_CACHE = new ConcurrentHashMap<>(); public SseEmitter createSseConnect(String clientId) { if (StrUtil.isEmpty(clientId)) { clientId = IdUtil.simpleUUID(); } try { SseEmitter sseEmitter = new SseEmitter(0L); sseEmitter.onCompletion(onCompletionRunnable(clientId)); sseEmitter.onTimeout(onTimeoutRunnable(clientId)); sseEmitter.onError(errorCallBack(clientId)); SSE_CACHE.put(clientId, sseEmitter); log.info("[SSE服务端].[创建新的sse连接,当前客户端:{}]", clientId); sseEmitter.send(SseEmitter.event().id(SseEventEnum.CREATE_SSE_ID.getEventId()).data(clientId)); return sseEmitter; } catch (IOException e) { log.error("[SSE服务端].[createSseConnect 创建长连接异常,客户端id:{}]", clientId, e); return null; } catch (AsyncRequestTimeoutException e2) { log.info("[SSE服务端].[createSseConnect 连接超时,关闭连接 errMsg={}]", e2.getMessage()); return null; } } public void close(String clientId) { SseEmitter sseEmitter = getSseEmitter(clientId); if (sseEmitter != null) { sseEmitter.complete(); removeClient(clientId); } log.info("[SSE服务端].[close 关闭连接 clientId={}]", clientId); } public SseEmitter getSseEmitter(String clientId) { return SSE_CACHE.get(clientId); } public void sendMsg(String clientId, Object data, String eventId) { SseEmitter sseEmitter = getSseEmitter(clientId); if (sseEmitter == null) { log.error("[SSE服务端].[推送消息失败:客户端{}未创建连接]", clientId); return; } SseEmitter.SseEventBuilder builderData = SseEmitter.event().id(eventId).data(data); try { sseEmitter.send(builderData); } catch (IOException e) { log.error("[SSE服务端].[推送消息失败]", e); } } public void sendAll(Object data, String eventId) { SSE_CACHE.forEach((k, v) -> { sendMsg(k, data, eventId); }); } private Runnable onCompletionRunnable(String clientId) { return () -> { log.info("结束连接:{}", clientId); close(clientId); }; } private Runnable onTimeoutRunnable(String clientId) { return () -> { log.info("连接超时:{}", clientId); close(clientId); }; } private Consumer errorCallBack(String clientId) { return throwable -> { log.info("连接异常:{}", clientId); close(clientId); }; } private void removeClient(String clientId) { SSE_CACHE.remove(clientId); log.info("[SSE服务端].[SseEmitterServer 清除客户端:{}]", clientId); } }