From b4bfd886af0460ae84b2d70233c2a77866930a46 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期一, 02 六月 2025 15:25:21 +0800 Subject: [PATCH] 修改bug和将roacktmq连接方式切换为直连方式 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java | 32 +++++++++++++++++++++++++------- 1 files changed, 25 insertions(+), 7 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java index 69773c3..b6a1141 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java @@ -5,10 +5,12 @@ import com.ruoyi.integration.rocket.configuration.RocketEnhanceProperties; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; @@ -24,9 +26,6 @@ private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { -// DefaultMQProducer producer = new DefaultMQProducer(); -// producer.setProducerGroup("enhance_consumer_group"); -// template.setProducer(producer); return template; } @@ -57,16 +56,35 @@ // 注意分隔符 return send(buildDestination(topic,tag), message); } - + /** + * 发送异步消息 + */ + public <T extends BaseMessage> void sendAsynchronous(String topic, String tag, T message) { + // 注意分隔符 + sendAsynchronous(buildDestination(topic,tag), message); + } public <T extends BaseMessage> SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... - Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 - log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; + } + + public <T extends BaseMessage> void sendAsynchronous(String destination, T message) { + try { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); + template.send(destination, sendMessage); + // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 + log.info("[{}]异步消息[{}]", destination, JSONObject.toJSON(message)); + }catch (Exception e){ + e.printStackTrace(); + } } /** @@ -77,7 +95,7 @@ } public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { - Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; -- Gitblit v1.7.1