From bed62f9779da7b3f3d4e555438b8be9af438bf54 Mon Sep 17 00:00:00 2001
From: 无关风月 <443237572@qq.com>
Date: 星期五, 18 四月 2025 09:21:41 +0800
Subject: [PATCH] Merge branch 'master' of http://120.76.84.145:10101/gitblit/r/java/mx_charging_pile

---
 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java |  159 ++++++++++++++++++++++++++++++++++------------------
 1 files changed, 103 insertions(+), 56 deletions(-)

diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
index ce95aeb..d4db2fc 100644
--- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
+++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -45,6 +45,8 @@
 import java.util.Date;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -162,15 +164,16 @@
                 pingService.save(ping);
                 //存储缓存中,5分钟有效
                 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
-                
-                UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
-                vo1.setGun_code(pingMessage.getCharging_gun_code());
-                vo1.setPile_code(pingMessage.getCharging_pile_code());
-                vo1.setStatus(pingMessage.getCharging_gun_status());
-                chargingPileClient.updateChargingPileStatus(vo1);
-                new Thread(new Runnable() {
+                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
+                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
+                        vo1.setGun_code(pingMessage.getCharging_gun_code());
+                        vo1.setPile_code(pingMessage.getCharging_pile_code());
+                        vo1.setStatus(pingMessage.getCharging_gun_status());
+                        chargingPileClient.updateChargingPileStatus(vo1);
+    
                         try {
                             tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData());
                         }catch (Exception e){
@@ -178,7 +181,7 @@
                             System.out.println("设备状态推送监管平台失败:"+e.getMessage());
                         }
                     }
-                }).start();
+                });
                 break;
             case SendTagConstant.END_CHARGE:
                 EndChargeMessage endChargeMessage = message.getEndChargeMessage();
@@ -187,13 +190,12 @@
                 EndCharge endCharge = new EndCharge();
                 BeanUtils.copyProperties(endChargeMessage,endCharge);
                 endChargeService.create(endCharge);
-                // 业务处理
-                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
-                // 监管平台
-                // 查询订单信息
-                new Thread(new Runnable() {
+                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor1.execute(new Runnable() {
                     @Override
                     public void run() {
+                        // 业务处理
+                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                         try {
                             TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
                             tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
@@ -203,7 +205,7 @@
                             System.out.println("充电结束推送监管平台失败:"+e.getMessage());
                         }
                     }
-                }).start();
+                });
                 break;
             case SendTagConstant.ERROR_MESSAGE:
                 ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
@@ -264,18 +266,20 @@
                     if(i == 0){
                         log.error("数据存储mongo失败");
                     }
-                    // 业务处理
-                    UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
-                    BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
-                    chargingOrderClient.chargeMonitoring(query);
-                    chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
-                    new Thread(new Runnable() {
+    
+                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                    threadPoolExecutor2.execute(new Runnable() {
                         @Override
                         public void run() {
+                            // 业务处理
+                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
+                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
+                            chargingOrderClient.chargeMonitoring(query);
+                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
+    
                             tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
                         }
-                    }).start();
-
+                    });
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -303,8 +307,15 @@
                 BmsAbort bmsAbort = new BmsAbort();
                 BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                 bmsAbortService.create(bmsAbort);
-                // 业务处理
-                chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
+    
+                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor3.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
+                    }
+                });
                 break;
             case SendTagConstant.MOTOR_ABORT:
                 MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
@@ -313,8 +324,14 @@
                 MotorAbort motorAbort = new MotorAbort();
                 BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                 motorAbortService.create(motorAbort);
-                // 业务处理
-                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
+                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor4.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
+                    }
+                });
                 break;
             case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                 BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
@@ -323,12 +340,18 @@
                 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                 BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                 bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
-                // 业务处理
-                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
-                if(Objects.nonNull(chargingOrderBms)){
-                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
-                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
-                }
+                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor5.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
+                        if(Objects.nonNull(chargingOrderBms)){
+                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
+                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
+                        }
+                    }
+                });
                 break;
             case SendTagConstant.BMS_INFORMATION:
                 BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
@@ -353,10 +376,16 @@
                 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                 BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                 platformStartChargingReplyService.create(platformStartChargingReply);
-                // 业务处理
-                PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
-                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
-                chargingOrderClient.startChargeSuccessfully(message1);
+                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor6.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
+                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
+                        chargingOrderClient.startChargeSuccessfully(message1);
+                    }
+                });
                 break;
             case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                 PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
@@ -365,9 +394,15 @@
                 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                 BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                 platformStopChargingReplyService.create(platformStopChargingReply);
-                PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
-                BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
-                chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
+                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor7.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
+                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
+                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
+                    }
+                });
                 break;
             case SendTagConstant.TRANSACTION_RECORD:
                 TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
@@ -377,20 +412,26 @@
                 TransactionRecord transactionRecord = new TransactionRecord();
                 BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                 transactionRecordService.create(transactionRecord);
-                // 业务处理
-                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
-                if(Objects.nonNull(chargingOrderRecord)){
-                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
-                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
-                }
-                //计算费用
-                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
-                BeanUtils.copyProperties(transactionRecordMessage,vo);
-                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
-                if(200 != code){
-                    //失败后添加到队列中继续处理数据
-                    redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
-                }
+                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor8.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
+                        if(Objects.nonNull(chargingOrderRecord)){
+                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
+                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
+                        }
+                        //计算费用
+                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
+                        BeanUtils.copyProperties(transactionRecordMessage,vo);
+                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
+                        if(200 != code){
+                            //失败后添加到队列中继续处理数据
+                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
+                        }
+                    }
+                });
     
     
                 // 添加实时上传记录结束记录
@@ -488,9 +529,15 @@
                 SecurityDetection securityDetection = new SecurityDetection();
                 BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                 securityDetectionService.create(securityDetection);
-                SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
-                BeanUtils.copyProperties(securityDetection, securityDetection1);
-                chargingOrderClient.securityDetection(securityDetection1);
+                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor9.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
+                        BeanUtils.copyProperties(securityDetection, securityDetection1);
+                        chargingOrderClient.securityDetection(securityDetection1);
+                    }
+                });
                 break;
             default:
                 PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();

--
Gitblit v1.7.1