Pu Zhibing
2025-06-02 b4bfd886af0460ae84b2d70233c2a77866930a46
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java
@@ -5,10 +5,12 @@
import com.ruoyi.integration.rocket.configuration.RocketEnhanceProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
@@ -24,9 +26,6 @@
    private RocketEnhanceProperties rocketEnhanceProperties;
    public RocketMQTemplate getTemplate() {
//        DefaultMQProducer producer = new DefaultMQProducer();
//        producer.setProducerGroup("enhance_consumer_group");
//        template.setProducer(producer);
        return template;
    }
@@ -57,16 +56,35 @@
        // 注意分隔符
        return send(buildDestination(topic,tag), message);
    }
    /**
     * 发送异步消息
     */
    public <T extends BaseMessage> void sendAsynchronous(String topic, String tag, T message) {
        // 注意分隔符
        sendAsynchronous(buildDestination(topic,tag), message);
    }
    public <T extends BaseMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
    public <T extends BaseMessage> void sendAsynchronous(String destination, T message) {
        try {
            // 设置业务键,此处根据公共的参数进行处理
            // 更多的其它基础业务处理...
            Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
            template.send(destination, sendMessage);
            // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
            log.info("[{}]异步消息[{}]", destination, JSONObject.toJSON(message));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
@@ -77,7 +95,7 @@
    }
    public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;