yangys
2024-09-04 04c57331cf84c8f606c2838dcb6fe5463fb9b68c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.qianwen.smartman.modules.notify.message.rocket.consumer;
 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import com.qianwen.core.notify.event.SerializableNotifierEvent;
import com.qianwen.smartman.modules.notify.entity.NotifyHistoryEntity;
import com.qianwen.smartman.modules.notify.service.INotifyHistoryService;
 
@RocketMQMessageListener(consumerGroup = "notify-center-history-consumerblade-api", topic = "notify-center-history-topic", selectorExpression = "success||error")
 
public class NotifyHistoryConsumer implements RocketMQListener<SerializableNotifierEvent>, RocketMQPushConsumerLifecycleListener {
    private final INotifyHistoryService notifyHistoryService;
 
    public NotifyHistoryConsumer(final INotifyHistoryService notifyHistoryService) {
        this.notifyHistoryService = notifyHistoryService;
    }
 
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setMaxReconsumeTimes(3);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    }
 
    public void onMessage(SerializableNotifierEvent serializableNotifierEvent) {
        this.notifyHistoryService.save(NotifyHistoryEntity.of(serializableNotifierEvent));
    }
}