From 1adec9fead03f0f788a73f9349ccba86569e31f3 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期三, 30 四月 2025 19:40:11 +0800
Subject: [PATCH] 修改rocketmq连接方式和修改发起充电异常情况下将订单挂起的功能

---
 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java |  179 +++++++++++++++++++----------------------------------------
 1 files changed, 59 insertions(+), 120 deletions(-)

diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
index f3eb7bf..0cb4875 100644
--- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
+++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
@@ -10,8 +10,12 @@
 import com.ruoyi.integration.api.feignClient.TCECClient;
 import com.ruoyi.integration.api.model.Online;
 import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData;
+import com.ruoyi.integration.iotda.constant.SendTagConstant;
 import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService;
+import com.ruoyi.integration.rocket.model.ChargingMessage;
+import com.ruoyi.integration.rocket.model.ChargingOrderMessage;
 import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage;
+import com.ruoyi.integration.rocket.produce.EnhanceProduce;
 import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
 import com.ruoyi.order.api.feignClient.ChargingOrderClient;
 import com.ruoyi.order.api.model.TChargingOrder;
@@ -36,10 +40,9 @@
         messageModel = MessageModel.CLUSTERING,
         consumerGroup = "charge_upload_real_time_monitoring_data",
         topic = "charge_upload_real_time_monitoring_data",
-        selectorExpression = "upload_real_time_monitoring_data",
-        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
+        selectorExpression = "upload_real_time_monitoring_data"
 )
-public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
+public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
 
     @Autowired
     private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
@@ -48,129 +51,65 @@
     private ChargingOrderClient chargingOrderClient;
     @Resource
     private AccountingStrategyDetailClient accountingStrategyDetailClient;
-    @Resource
-    private ChargingGunClient chargingGunClient;
-    @Resource
-    private FaultMessageClient faultMessageClient;
+    @Autowired
+    private EnhanceProduce enhanceProduce;
+
+
     
-    @Resource
-    private TCECClient tcecClient;
-
-
-    @Override
-    protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception {
-        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
-        log.info("上传实时监测数据-业务消息处理:{}",message);
-        // 持久化消息
-        UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
-        BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
-        // 查询mogondb上一条数据
-        UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
-        // 查询订单
-        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
-        // 查询当前时间段的计费策略
-        TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
-        uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
-        uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
-        if (Objects.nonNull(data)) {
-            uploadRealTimeMonitoringData.setLast_time(data.getLast_time());
-            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount()));
-            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree()));
-            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
-        }else {
-            log.info("首次上传实时监测数据");
-            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
-            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
-            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
-        }
-        uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
-        // 业务处理
-        UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
-        BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
-        chargingOrderClient.chargeMonitoring(query);
-        GetChargingGunByCode code = new GetChargingGunByCode();
-        code.setCharging_pile_code(message.getCharging_pile_code());
-        code.setCharging_gun_code(message.getCharging_gun_code());
-        TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData();
-        if(Objects.nonNull(chargingGun)){
-            // 存储状态信息
-            TFaultMessage faultMessage = new TFaultMessage();
-            if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){
-                faultMessage.setSiteId(chargingGun.getSiteId());
-                faultMessage.setChargingPileId(chargingGun.getChargingPileId());
-                faultMessage.setChargingGunId(chargingGun.getId());
-                switch (message.getCharging_gun_status()){
-                    case 0:
-                        faultMessage.setStatus(1);
-                        chargingGun.setStatus(1);
-                        break;
-                    case 1:
-                        faultMessage.setStatus(2);
-                        chargingGun.setStatus(7);
-                        break;
-                }
-                faultMessage.setDownTime(LocalDateTime.now());
-                faultMessageClient.createFaultMessage(faultMessage);
-            }else {
-                switch (message.getCharging_gun_status()){
-                    case 2:
-                        chargingGun.setStatus(2);
-                        break;
-                    case 3:
-                        chargingGun.setStatus(4);
-                        break;
-                }
-                // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录
-                faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData();
-                if(Objects.nonNull(faultMessage)){
-                    faultMessage.setEndTime(LocalDateTime.now());
-                    faultMessageClient.updateFaultMessage(faultMessage);
-                }
-            }
-            chargingGunClient.updateChargingGunById(chargingGun);
-            //推送状态给三方平台
-            tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus());
-        }
-    }
-
-    @Override
-    protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) {
-        // 当超过指定重试次数消息时此处方法会被调用
-        // 生产中可以进行回退或其他业务操作
-        log.error("消息消费失败,请执行后续处理");
-    }
-
-
-    /**
-     * 是否执行重试机制
-     */
-    @Override
-    protected boolean isRetry() {
-        return true;
-    }
-
-    @Override
-    protected boolean throwException() {
-        // 是否抛出异常,false搭配retry自行处理异常
-        return false;
-    }
-
-    /**
-     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
-     * @param message 待处理消息
-     * @return true: 本次消息被过滤,false:不过滤
-     */
-    @Override
-    protected boolean filter(UploadRealTimeMonitoringDataMessage message) {
-        // 此处可做消息过滤
-        return false;
-    }
+    
+    
 
     /**
      * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
      */
     @Override
     public void onMessage(UploadRealTimeMonitoringDataMessage message) {
-        super.dispatchMessage(message);
+        try {
+            log.info("上传实时监测数据-业务消息处理:{}",message);
+            // 持久化消息
+            UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
+            BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
+            // 查询mogondb上一条数据
+            UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
+            // 查询订单
+            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
+            // 查询当前时间段的计费策略
+            TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
+            uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
+            uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
+            if (Objects.nonNull(data)) {
+                uploadRealTimeMonitoringDataService.updateById(data.getId());
+                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount()));
+                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree()));
+                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
+            }else {
+                log.info("首次上传实时监测数据");
+                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
+                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
+                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
+            }
+            uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
+            uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
+            uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
+            int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
+            if(i == 0){
+                log.error("数据存储mongo失败");
+            }
+        
+            // 业务处理
+            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
+            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
+            chargingOrderClient.chargeMonitoring(query);
+            // 订单id
+            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
+            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
+            // 推送充电订单信息
+            ChargingMessage chargingMessage4 = new ChargingMessage();
+            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
+            chargingMessage4.setOrderMessage(chargingOrderMessage3);
+            enhanceProduce.orderInfoMessage(chargingMessage4);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
 }
\ No newline at end of file

--
Gitblit v1.7.1