package com.qianwen.core.websocket.handler;
|
|
import com.fasterxml.jackson.core.JsonParser;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.json.JsonReadFeature;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import com.qianwen.core.websocket.custom.DefaultTypeSessionFilter;
|
import com.qianwen.core.websocket.custom.SubscriptionSessionFilter;
|
import com.qianwen.core.websocket.holder.JsonMessageFilterHolder;
|
import com.qianwen.core.websocket.holder.JsonMessageHandlerHolder;
|
import com.qianwen.core.websocket.message.JsonWebSocketMessage;
|
import org.springframework.web.socket.TextMessage;
|
import org.springframework.web.socket.WebSocketSession;
|
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
|
public class CustomWebSocketHandler extends TextWebSocketHandler {
|
private static final Logger log = LoggerFactory.getLogger(CustomWebSocketHandler.class);
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
private PlanTextMessageHandler planTextMessageHandler;
|
|
static {
|
MAPPER.enable(new JsonParser.Feature[]{JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature()});
|
}
|
|
public CustomWebSocketHandler() {
|
}
|
|
public CustomWebSocketHandler(PlanTextMessageHandler planTextMessageHandler) {
|
this.planTextMessageHandler = planTextMessageHandler;
|
}
|
|
public void handleTextMessage(WebSocketSession session, TextMessage message) throws JsonProcessingException {
|
if (message.getPayloadLength() == 0) {
|
return;
|
}
|
String payload = (String) message.getPayload();
|
JsonNode jsonNode = MAPPER.readTree(payload);
|
JsonNode typeNode = jsonNode.get(JsonWebSocketMessage.TYPE_FIELD);
|
if (typeNode == null) {
|
if (this.planTextMessageHandler != null) {
|
this.planTextMessageHandler.handle(session, payload);
|
return;
|
} else {
|
log.error("[handleTextMessage] 普通文本消息({})没有对应的消息处理器", payload);
|
return;
|
}
|
}
|
String messageType = typeNode.asText();
|
JsonMessageHandler jsonMessageHandler = JsonMessageHandlerHolder.getHandler(messageType);
|
if (jsonMessageHandler == null) {
|
log.error("[handleTextMessage] 消息类型({})不存在对应的消息处理器", messageType);
|
return;
|
}
|
Class<? extends JsonWebSocketMessage> messageClass = jsonMessageHandler.getMessageClass();
|
JsonWebSocketMessage websocketMessageJson = (JsonWebSocketMessage) MAPPER.treeToValue(jsonNode, messageClass);
|
jsonMessageHandler.handle(session, websocketMessageJson);
|
SubscriptionSessionFilter filter = JsonMessageFilterHolder.getFilter(messageType);
|
if (filter == null) {
|
filter = new DefaultTypeSessionFilter(messageType);
|
JsonMessageFilterHolder.addFilter(filter);
|
}
|
filter.onMessage(session, websocketMessageJson);
|
}
|
}
|