package com.qianwen.smartman.modules.sync.message.consumer; import cn.hutool.json.JSONObject; import java.util.Objects; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.qianwen.smartman.modules.sync.constant.QyWechatConstant; import com.qianwen.smartman.modules.sync.enums.SseEventEnum; import com.qianwen.smartman.modules.sync.message.sse.SseEmitterServer; import com.qianwen.smartman.modules.sync.util.CodeUtil; import org.springframework.stereotype.Component; /** * 企业微信消息消费 */ @RocketMQMessageListener(topic = QyWechatConstant.WECHAT_TOPIC, consumerGroup = QyWechatConstant.WECHAT_GROUP) @Component public class QyWechatSyncConsumer implements RocketMQListener { private static final Logger log = LoggerFactory.getLogger(QyWechatSyncConsumer.class); private final SseEmitterServer sseEmitterServer; public QyWechatSyncConsumer(final SseEmitterServer sseEmitterServer) { this.sseEmitterServer = sseEmitterServer; } public void onMessage(JSONObject message) { log.info("[企业微信二维码消息消费者].[收到延迟消息 message={}]", message); if (Objects.isNull(message)) { return; } String clientId = message.getStr("clientId"); Integer type = message.getInt("type"); Long empId = message.getLong("empId"); String cacheKey = type.equals(QyWechatConstant.QY_CODE_LOGIN_TYPE) ? CodeUtil.getLoginCodeCacheKey(clientId) : CodeUtil.getBindCodeCacheKey(clientId, empId); if ((type.equals(QyWechatConstant.QY_CODE_LOGIN_TYPE) ? CodeUtil.verifyLoginCodeExpire(clientId) : CodeUtil.verifyBindCodeExpire(clientId, empId)).booleanValue()) { return; } this.sseEmitterServer.sendMsg(clientId, Boolean.TRUE, Objects.equals(type, QyWechatConstant.QY_CODE_LOGIN_TYPE) ? SseEventEnum.LOGIN_CODE_EXPIRE_ID.getEventId() : SseEventEnum.BIND_CODE_EXPIRE_ID.getEventId()); CodeUtil.failureCode(cacheKey); } }