| | |
| | | // 注意分隔符 |
| | | return send(buildDestination(topic,tag), message); |
| | | } |
| | | |
| | | /** |
| | | * 发送异步消息 |
| | | */ |
| | | public <T extends BaseMessage> void sendAsynchronous(String topic, String tag, T message) { |
| | | // 注意分隔符 |
| | | sendAsynchronous(buildDestination(topic,tag), message); |
| | | } |
| | | |
| | | public <T extends BaseMessage> SendResult send(String destination, T message) { |
| | | // 设置业务键,此处根据公共的参数进行处理 |
| | |
| | | log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
| | | return sendResult; |
| | | } |
| | | |
| | | public <T extends BaseMessage> void sendAsynchronous(String destination, T message) { |
| | | try { |
| | | // 设置业务键,此处根据公共的参数进行处理 |
| | | // 更多的其它基础业务处理... |
| | | Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); |
| | | template.send(destination, sendMessage); |
| | | // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 |
| | | log.info("[{}]异步消息[{}]", destination, JSONObject.toJSON(message)); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送延迟消息 |