luodangjia
2024-09-11 71d710e252dccfaf35804c5d0e6a3f00dac9be2b
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java
@@ -5,25 +5,33 @@
import com.ruoyi.integration.mongodb.service.BmsDemandAndChargerExportationService;
import com.ruoyi.integration.rocket.model.BmsDemandAndChargerExportationMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_bms_demand_and_charger_exportation",
        topic = "charge_bms_demand_and_charger_exportation",
        selectorExpression = "bms_demand_and_charger_exportation",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class BmsDemandAndChargerExportationMessageListener extends EnhanceMessageHandler<BmsDemandAndChargerExportationMessage> implements RocketMQListener<BmsDemandAndChargerExportationMessage> {
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private ChargingOrderClient chargingOrderClient;
    @Override
    protected void handleMessage(BmsDemandAndChargerExportationMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
@@ -33,6 +41,11 @@
        BeanUtils.copyProperties(message,bmsDemandAndChargerExportation);
        bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
        // 业务处理
        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        if(Objects.nonNull(chargingOrder)){
            chargingOrder.setNeedElec(message.getBms_current_requirements());
            chargingOrderClient.updateChargingOrder(chargingOrder);
        }
    }
    @Override