package com.qianwen.smartman.modules.sync.message.sse;
|
|
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.StrUtil;
|
import java.io.IOException;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.function.Consumer;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import com.qianwen.smartman.modules.sync.enums.SseEventEnum;
|
import org.springframework.stereotype.Service;
|
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
/**
|
* 服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端
|
*/
|
@Service
|
public class SseEmitterServer {
|
private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class);
|
private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
|
|
public SseEmitter createSseConnect(String clientId) {
|
if (StrUtil.isEmpty(clientId)) {
|
clientId = IdUtil.simpleUUID();
|
}
|
try {
|
SseEmitter sseEmitter = new SseEmitter(0L);
|
sseEmitter.onCompletion(onCompletionRunnable(clientId));
|
sseEmitter.onTimeout(onTimeoutRunnable(clientId));
|
sseEmitter.onError(errorCallBack(clientId));
|
SSE_CACHE.put(clientId, sseEmitter);
|
log.info("[SSE服务端].[创建新的sse连接,当前客户端:{}]", clientId);
|
sseEmitter.send(SseEmitter.event().id(SseEventEnum.CREATE_SSE_ID.getEventId()).data(clientId));
|
return sseEmitter;
|
} catch (IOException e) {
|
log.error("[SSE服务端].[createSseConnect 创建长连接异常,客户端id:{}]", clientId, e);
|
return null;
|
} catch (AsyncRequestTimeoutException e2) {
|
log.info("[SSE服务端].[createSseConnect 连接超时,关闭连接 errMsg={}]", e2.getMessage());
|
return null;
|
}
|
}
|
|
public void close(String clientId) {
|
SseEmitter sseEmitter = getSseEmitter(clientId);
|
if (sseEmitter != null) {
|
sseEmitter.complete();
|
removeClient(clientId);
|
}
|
log.info("[SSE服务端].[close 关闭连接 clientId={}]", clientId);
|
}
|
|
public SseEmitter getSseEmitter(String clientId) {
|
return SSE_CACHE.get(clientId);
|
}
|
|
public void sendMsg(String clientId, Object data, String eventId) {
|
SseEmitter sseEmitter = getSseEmitter(clientId);
|
if (sseEmitter == null) {
|
log.error("[SSE服务端].[推送消息失败:客户端{}未创建连接]", clientId);
|
return;
|
}
|
SseEmitter.SseEventBuilder builderData = SseEmitter.event().id(eventId).data(data);
|
try {
|
sseEmitter.send(builderData);
|
} catch (IOException e) {
|
log.error("[SSE服务端].[推送消息失败]", e);
|
}
|
}
|
|
public void sendAll(Object data, String eventId) {
|
SSE_CACHE.forEach((k, v) -> {
|
sendMsg(k, data, eventId);
|
});
|
}
|
|
private Runnable onCompletionRunnable(String clientId) {
|
return () -> {
|
log.info("结束连接:{}", clientId);
|
close(clientId);
|
};
|
}
|
|
private Runnable onTimeoutRunnable(String clientId) {
|
return () -> {
|
log.info("连接超时:{}", clientId);
|
close(clientId);
|
};
|
}
|
|
private Consumer<Throwable> errorCallBack(String clientId) {
|
return throwable -> {
|
log.info("连接异常:{}", clientId);
|
close(clientId);
|
};
|
}
|
|
private void removeClient(String clientId) {
|
SSE_CACHE.remove(clientId);
|
log.info("[SSE服务端].[SseEmitterServer 清除客户端:{}]", clientId);
|
}
|
}
|