Pu Zhibing
2025-04-30 1adec9fead03f0f788a73f9349ccba86569e31f3
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
@@ -10,8 +10,12 @@
import com.ruoyi.integration.api.feignClient.TCECClient;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService;
import com.ruoyi.integration.rocket.model.ChargingMessage;
import com.ruoyi.integration.rocket.model.ChargingOrderMessage;
import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage;
import com.ruoyi.integration.rocket.produce.EnhanceProduce;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
@@ -36,10 +40,9 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_upload_real_time_monitoring_data",
        topic = "charge_upload_real_time_monitoring_data",
        selectorExpression = "upload_real_time_monitoring_data",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "upload_real_time_monitoring_data"
)
public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
@@ -48,129 +51,65 @@
    private ChargingOrderClient chargingOrderClient;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private FaultMessageClient faultMessageClient;
    @Autowired
    private EnhanceProduce enhanceProduce;
    
    @Resource
    private TCECClient tcecClient;
    @Override
    protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("上传实时监测数据-业务消息处理:{}",message);
        // 持久化消息
        UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
        BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
        // 查询mogondb上一条数据
        UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
        // 查询订单
        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        // 查询当前时间段的计费策略
        TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
        uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
        uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
        if (Objects.nonNull(data)) {
            uploadRealTimeMonitoringData.setLast_time(data.getLast_time());
            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount()));
            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree()));
            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
        }else {
            log.info("首次上传实时监测数据");
            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
        }
        uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
        // 业务处理
        UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
        BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
        chargingOrderClient.chargeMonitoring(query);
        GetChargingGunByCode code = new GetChargingGunByCode();
        code.setCharging_pile_code(message.getCharging_pile_code());
        code.setCharging_gun_code(message.getCharging_gun_code());
        TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData();
        if(Objects.nonNull(chargingGun)){
            // 存储状态信息
            TFaultMessage faultMessage = new TFaultMessage();
            if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){
                faultMessage.setSiteId(chargingGun.getSiteId());
                faultMessage.setChargingPileId(chargingGun.getChargingPileId());
                faultMessage.setChargingGunId(chargingGun.getId());
                switch (message.getCharging_gun_status()){
                    case 0:
                        faultMessage.setStatus(1);
                        chargingGun.setStatus(1);
                        break;
                    case 1:
                        faultMessage.setStatus(2);
                        chargingGun.setStatus(7);
                        break;
                }
                faultMessage.setDownTime(LocalDateTime.now());
                faultMessageClient.createFaultMessage(faultMessage);
            }else {
                switch (message.getCharging_gun_status()){
                    case 2:
                        chargingGun.setStatus(2);
                        break;
                    case 3:
                        chargingGun.setStatus(4);
                        break;
                }
                // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录
                faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData();
                if(Objects.nonNull(faultMessage)){
                    faultMessage.setEndTime(LocalDateTime.now());
                    faultMessageClient.updateFaultMessage(faultMessage);
                }
            }
            chargingGunClient.updateChargingGunById(chargingGun);
            //推送状态给三方平台
            tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus());
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(UploadRealTimeMonitoringDataMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(UploadRealTimeMonitoringDataMessage message) {
        super.dispatchMessage(message);
        try {
            log.info("上传实时监测数据-业务消息处理:{}",message);
            // 持久化消息
            UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
            BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
            // 查询mogondb上一条数据
            UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
            // 查询订单
            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
            // 查询当前时间段的计费策略
            TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
            uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
            uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
            if (Objects.nonNull(data)) {
                uploadRealTimeMonitoringDataService.updateById(data.getId());
                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount()));
                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree()));
                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
            }else {
                log.info("首次上传实时监测数据");
                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
            }
            uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
            uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
            uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
            int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
            if(i == 0){
                log.error("数据存储mongo失败");
            }
            // 业务处理
            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
            chargingOrderClient.chargeMonitoring(query);
            // 订单id
            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
            // 推送充电订单信息
            ChargingMessage chargingMessage4 = new ChargingMessage();
            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
            chargingMessage4.setOrderMessage(chargingOrderMessage3);
            enhanceProduce.orderInfoMessage(chargingMessage4);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}