package com.ruoyi.integration.rocket.util; import com.alibaba.fastjson.JSONObject; import com.ruoyi.integration.rocket.base.BaseMessage; import com.ruoyi.integration.rocket.configuration.RocketEnhanceProperties; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; import javax.annotation.Resource; @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; @Resource private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { return template; } /** * 根据系统上下文自动构建隔离后的topic * 构建目的地 */ public String buildDestination(String topic, String tag) { topic = reBuildTopic(topic); return topic + ":" + tag; } /** * 根据环境重新隔离topic * @param topic 原始topic */ private String reBuildTopic(String topic) { if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ return topic +"_" + rocketEnhanceProperties.getEnvironment(); } return topic; } /** * 发送同步消息 */ public SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(buildDestination(topic,tag), message); } public SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 发送延迟消息 */ public SendResult send(String topic, String tag, T message, int delayLevel) { return send(buildDestination(topic,tag), message, delayLevel); } public SendResult send(String destination, T message, int delayLevel) { Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } }