package com.ruoyi.integration.rocket.util;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.ruoyi.integration.rocket.base.BaseMessage;
|
import com.ruoyi.integration.rocket.configuration.RocketEnhanceProperties;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.messaging.Message;
|
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.util.StringUtils;
|
|
import javax.annotation.Resource;
|
|
@Slf4j
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
|
public class RocketMQEnhanceTemplate {
|
private final RocketMQTemplate template;
|
|
@Resource
|
private RocketEnhanceProperties rocketEnhanceProperties;
|
|
public RocketMQTemplate getTemplate() {
|
return template;
|
}
|
|
/**
|
* 根据系统上下文自动构建隔离后的topic
|
* 构建目的地
|
*/
|
public String buildDestination(String topic, String tag) {
|
topic = reBuildTopic(topic);
|
return topic + ":" + tag;
|
}
|
|
/**
|
* 根据环境重新隔离topic
|
* @param topic 原始topic
|
*/
|
private String reBuildTopic(String topic) {
|
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
|
return topic +"_" + rocketEnhanceProperties.getEnvironment();
|
}
|
return topic;
|
}
|
|
/**
|
* 发送同步消息
|
*/
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
|
// 注意分隔符
|
return send(buildDestination(topic,tag), message);
|
}
|
|
|
public <T extends BaseMessage> SendResult send(String destination, T message) {
|
// 设置业务键,此处根据公共的参数进行处理
|
// 更多的其它基础业务处理...
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
|
SendResult sendResult = template.syncSend(destination, sendMessage);
|
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
|
return sendResult;
|
}
|
|
/**
|
* 发送延迟消息
|
*/
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
|
return send(buildDestination(topic,tag), message, delayLevel);
|
}
|
|
public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
|
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
|
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
|
return sendResult;
|
}
|
}
|