无关风月
22 小时以前 94fee5042cc79cd5de82dbfc3438c1a4747861ef
政务云rocketMQ版本更换
3个文件已修改
108 ■■■■■ 已修改文件
ruoyi-service/ruoyi-jianguan/pom.xml 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/RuoYiJianGuanApplication.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/rocket/produce/ChargingMessageListener.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-jianguan/pom.xml
@@ -106,30 +106,36 @@
        </dependency>
        <!--rocketmq-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.2.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
<!--            <version>2.2.2.RELEASE</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>org.apache.rocketmq</groupId>-->
<!--                    <artifactId>rocketmq-client</artifactId>-->
<!--                </exclusion>-->
<!--                <exclusion>-->
<!--                    <groupId>org.apache.rocketmq</groupId>-->
<!--                    <artifactId>rocketmq-acl</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->
<!--        -->
<!--        <dependency>-->
<!--            <groupId>org.apache.rocketmq</groupId>-->
<!--            <artifactId>rocketmq-client</artifactId>-->
<!--            <version>4.7.1</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.rocketmq</groupId>-->
<!--            <artifactId>rocketmq-acl</artifactId>-->
<!--            <version>4.7.1</version>-->
<!--        </dependency>-->
        <!--mongodb-->
        <dependency>
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/RuoYiJianGuanApplication.java
@@ -20,7 +20,6 @@
@EnableRyFeignClients
@SpringBootApplication
@EnableScheduling//开启定时任务
@EnableBinding({ Source.class, Sink.class })
public class RuoYiJianGuanApplication {
    public static void main(String[] args) {
        SpringApplication.run(RuoYiJianGuanApplication.class, args);
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/rocket/produce/ChargingMessageListener.java
@@ -23,8 +23,6 @@
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -42,10 +40,9 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "jianguan_message",
        topic = "jianguan_message",
        selectorExpression = "jianguan_message",
        consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "jianguan_message"
)
public class ChargingMessageListener extends EnhanceMessageHandler<JianGuanMessage> implements RocketMQListener<JianGuanMessage> {
public class ChargingMessageListener implements RocketMQListener<JianGuanMessage> {
    @Resource
    private ChargingOrderClient chargingOrderClient;
@@ -66,9 +63,12 @@
    private final static String operatorId = "906171535";
    @StreamListener("input")
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    protected void handleMessage(JianGuanMessage message) throws Exception {
    public void onMessage(JianGuanMessage message) {
        log.info("rocket收到的消息内容:{}", message);
        String serviceId = message.getServiceId();
        if (!StringUtils.hasLength(serviceId)) {
@@ -297,49 +297,6 @@
                break;
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(JianGuanMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     *
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(JianGuanMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(JianGuanMessage message) {
        super.dispatchMessage(message);
    }
}