package com.qianwen.core.websocket.handler; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.qianwen.core.websocket.custom.DefaultTypeSessionFilter; import com.qianwen.core.websocket.custom.SubscriptionSessionFilter; import com.qianwen.core.websocket.holder.JsonMessageFilterHolder; import com.qianwen.core.websocket.holder.JsonMessageHandlerHolder; import com.qianwen.core.websocket.message.JsonWebSocketMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; public class CustomWebSocketHandler extends TextWebSocketHandler { private static final Logger log = LoggerFactory.getLogger(CustomWebSocketHandler.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private PlanTextMessageHandler planTextMessageHandler; static { MAPPER.enable(new JsonParser.Feature[]{JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature()}); } public CustomWebSocketHandler() { } public CustomWebSocketHandler(PlanTextMessageHandler planTextMessageHandler) { this.planTextMessageHandler = planTextMessageHandler; } @Override public void handleTextMessage(WebSocketSession session, TextMessage message) throws JsonProcessingException { if (message.getPayloadLength() == 0) { return; } String payload = (String) message.getPayload(); JsonNode jsonNode = MAPPER.readTree(payload); JsonNode typeNode = jsonNode.get(JsonWebSocketMessage.TYPE_FIELD); if (typeNode == null) { if (this.planTextMessageHandler != null) { this.planTextMessageHandler.handle(session, payload); return; } else { log.error("[handleTextMessage] 普通文本消息({})没有对应的消息处理器", payload); return; } } String messageType = typeNode.asText(); JsonMessageHandler jsonMessageHandler = JsonMessageHandlerHolder.getHandler(messageType); if (jsonMessageHandler == null) { log.error("[handleTextMessage] 消息类型({})不存在对应的消息处理器", messageType); return; } Class messageClass = jsonMessageHandler.getMessageClass(); JsonWebSocketMessage websocketMessageJson = (JsonWebSocketMessage) MAPPER.treeToValue(jsonNode, messageClass); jsonMessageHandler.handle(session, websocketMessageJson); SubscriptionSessionFilter filter = JsonMessageFilterHolder.getFilter(messageType); if (filter == null) { filter = new DefaultTypeSessionFilter(messageType); JsonMessageFilterHolder.addFilter(filter); } filter.onMessage(session, websocketMessageJson); } }