| | |
| | | import com.ruoyi.integration.rocket.configuration.RocketEnhanceProperties; |
| | | import lombok.RequiredArgsConstructor; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| | | import org.apache.rocketmq.client.producer.SendResult; |
| | | import org.apache.rocketmq.spring.core.RocketMQTemplate; |
| | | import org.apache.rocketmq.spring.support.RocketMQHeaders; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.messaging.Message; |
| | | import org.springframework.messaging.support.MessageBuilder; |
| | | import org.springframework.util.StringUtils; |
| | |
| | | private RocketEnhanceProperties rocketEnhanceProperties; |
| | | |
| | | public RocketMQTemplate getTemplate() { |
| | | // DefaultMQProducer producer = new DefaultMQProducer(); |
| | | // producer.setProducerGroup("enhance_consumer_group"); |
| | | // template.setProducer(producer); |
| | | return template; |
| | | } |
| | | |
| | |
| | | public <T extends BaseMessage> SendResult send(String destination, T message) { |
| | | // 设置业务键,此处根据公共的参数进行处理 |
| | | // 更多的其它基础业务处理... |
| | | Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
| | | Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); |
| | | SendResult sendResult = template.syncSend(destination, sendMessage); |
| | | // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 |
| | | log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
| | | log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
| | | return sendResult; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { |
| | | Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
| | | Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); |
| | | SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); |
| | | log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
| | | return sendResult; |