Pu Zhibing
2024-11-05 0132ff3faf4d27f6b99af37f8ea8cb9520b1622f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.ruoyi.integration.rocket.util;
 
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.integration.rocket.base.BaseMessage;
import com.ruoyi.integration.rocket.constant.EnhanceMessageConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 
import javax.annotation.Resource;
 
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
    /**
     * 默认重试次数
     */
    private static final int MAX_RETRY_TIMES = 3;
 
    /**
     * 延时等级
     */
    private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;
 
 
    @Resource
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
 
    /**
     * 消息处理
     *
     * @param message 待处理消息
     * @throws Exception 消费异常
     */
    protected abstract void handleMessage(T message) throws Exception;
 
    /**
     * 超过重试次数消息,需要启用isRetry
     *
     * @param message 待处理消息
     */
    protected abstract void handleMaxRetriesExceeded(T message);
 
 
    /**
     * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    protected boolean filter(T message) {
        return false;
    }
 
 
    /**
     * 是否异常时重复发送
     *
     * @return true: 消息重试,false:不重试
     */
    protected abstract boolean isRetry();
 
    /**
     * 消费异常时是否抛出异常
     * 返回true,则由rocketmq机制自动重试
     * false:消费异常(如果没有开启重试则消息会被自动ack)
     */
    protected abstract boolean throwException();
 
    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默认5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }
 
    /**
     * isRetry开启时,重新入队延迟时间
     * @return -1:立即入队重试
     */
    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }
 
    /**
     * 使用模板模式构建消息消费框架,可自由扩展或删减
     */
    public void dispatchMessage(T message) {
        // 基础日志记录被父类处理了
        log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
 
        if (filter(message)) {
            log.info("消息id{}不满足消费条件,已过滤。",message.getKey());
            return;
        }
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
        try {
            long now = System.currentTimeMillis();
            handleMessage(message);
            long costTime = System.currentTimeMillis() - now;
            log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime);
        } catch (Exception e) {
            log.error("消息{}消费异常", message.getKey(),e);
            // 是捕获异常还是抛出,由子类决定
            if (throwException()) {
                //抛出异常,由DefaultMessageListenerConcurrently类处理
                throw new RuntimeException(e);
            }
            //此时如果不开启重试机制,则默认ACK了
            if (isRetry()) {
                handleRetry(message);
            }
        }
    }
 
    protected void handleRetry(T message) {
        // 获取子类RocketMQMessageListener注解拿到topic和tag
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            return;
        }
        //重新构建消息体
        String messageSource = message.getSource();
        if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
            message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
        }
        message.setRetryTimes(message.getRetryTimes() + 1);
 
        SendResult sendResult;
 
        try {
            // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
            sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
        } catch (Exception ex) {
            // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
            //由生产者直接发送
            throw new RuntimeException(ex);
        }
        // 发送失败的处理就是不进行ACK,由RocketMQ重试
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("重试消息发送失败");
        }
 
    }
 
}