无关风月
2024-09-10 caa2b51bb4133f5a2e59eac242e744fb57f864a0
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java
@@ -10,6 +10,7 @@
import com.ruoyi.integration.rocket.model.OnlineMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
@@ -19,19 +20,16 @@
@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_online",
        topic = "charge_online",
        selectorExpression = "online", // 明确指定标签
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class OnlineMessageListener extends EnhanceMessageHandler<OnlineMessage> implements RocketMQListener<OnlineMessage> {
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private MessageUtil messageUtil;
    @Autowired
    private IotMessageProduce iotMessageProduce;
    @Override
    protected void handleMessage(OnlineMessage message) throws Exception {
@@ -41,12 +39,7 @@
        Online online = new Online();
        BeanUtils.copyProperties(message,online);
        onlineService.create(online);
        // 业务处理 登录认证应答
        OnlineReply onlineReply = new OnlineReply();
        onlineReply.setCharging_pile_code(message.getCharging_pile_code());
        onlineReply.setOnline_result(0);
        String result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply));
        log.info("充电桩登录认证-返回结果:{}",result);
        // 业务处理
    }
    @Override