From 8d6ef24217033e13b356502f2ade8737a43cce2b Mon Sep 17 00:00:00 2001
From: 无关风月 <443237572@qq.com>
Date: 星期六, 19 四月 2025 17:52:15 +0800
Subject: [PATCH] MQ消息队列监管平台数据上传

---
 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java |  222 ++++++++++++++++++++++++++----------------------------
 1 files changed, 107 insertions(+), 115 deletions(-)

diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
index 60bd13f..8072fab 100644
--- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
+++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
@@ -5,6 +5,7 @@
 import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
 import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
 import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
+import com.ruoyi.chargingPile.api.model.TChargingGun;
 import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
 import com.ruoyi.integration.api.model.*;
 import com.ruoyi.integration.drainage.TCECPushUtil;
@@ -20,6 +21,7 @@
 import com.ruoyi.order.api.vo.SecurityDetectionVO;
 import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
@@ -115,7 +117,8 @@
 
     @Resource
     private RedisTemplate redisTemplate;
-    
+    @Autowired
+    private EnhanceProduce enhanceProduce;
     
     
     
@@ -146,24 +149,22 @@
                 pingService.save(ping);
                 //存储缓存中,5分钟有效
                 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
-                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
-                        vo1.setGun_code(pingMessage.getCharging_gun_code());
-                        vo1.setPile_code(pingMessage.getCharging_pile_code());
-                        vo1.setStatus(pingMessage.getCharging_gun_status());
-                        chargingPileClient.updateChargingPileStatus(vo1);
     
-                        try {
-                            tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData());
-                        }catch (Exception e){
-                            e.printStackTrace();
-                            System.out.println("设备状态推送监管平台失败:"+e.getMessage());
-                        }
-                    }
-                });
+                UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
+                vo1.setGun_code(pingMessage.getCharging_gun_code());
+                vo1.setPile_code(pingMessage.getCharging_pile_code());
+                vo1.setStatus(pingMessage.getCharging_gun_status());
+                chargingPileClient.updateChargingPileStatus(vo1);
+                // 监管平台推送充电设备状态
+                SendResult sendResult;
+                String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code();
+                ChargingMessage chargingMessage = new ChargingMessage();
+                chargingMessage.setServiceId(SendTagConstant.GUN_STATUS);
+                GunStatusMessage gunStatusMessage = new GunStatusMessage();
+                gunStatusMessage.setFullNumber(gunCode);
+                chargingMessage.setGunStatusMessage(gunStatusMessage);
+                sendResult = enhanceProduce.gunStatusMessage(chargingMessage);
+
                 break;
             case SendTagConstant.END_CHARGE:
                 EndChargeMessage endChargeMessage = message.getEndChargeMessage();
@@ -172,22 +173,36 @@
                 EndCharge endCharge = new EndCharge();
                 BeanUtils.copyProperties(endChargeMessage,endCharge);
                 endChargeService.create(endCharge);
-                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor1.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // 业务处理
-                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
-                        try {
-                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
-                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
-                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
-                        }catch (Exception e){
-                            e.printStackTrace();
-                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
-                        }
-                    }
-                });
+                // 业务处理
+                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
+                // 订单id
+                String transactionSerialNumber = endCharge.getTransaction_serial_number();
+                ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
+                chargingOrderMessage.setOrderNumber(transactionSerialNumber);
+                // 推送充电订单信息
+                ChargingMessage chargingMessage1 = new ChargingMessage();
+                chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
+                chargingMessage1.setOrderMessage(chargingOrderMessage);
+                enhanceProduce.orderInfoMessage(chargingMessage1);
+                // 推送充电订单状态
+                ChargingMessage chargingMessage2 = new ChargingMessage();
+                chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
+                chargingMessage2.setOrderMessage(chargingOrderMessage);
+                enhanceProduce.orderStatusMessage(chargingMessage2);
+//                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+//                threadPoolExecutor1.execute(new Runnable() {
+//                    @Override
+//                    public void run() {
+//                        try {
+//                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
+//                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
+//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
+//                        }catch (Exception e){
+//                            e.printStackTrace();
+//                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
+//                        }
+//                    }
+//                });
                 break;
             case SendTagConstant.ERROR_MESSAGE:
                 ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
@@ -249,19 +264,27 @@
                         log.error("数据存储mongo失败");
                     }
     
-                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                    threadPoolExecutor2.execute(new Runnable() {
-                        @Override
-                        public void run() {
-                            // 业务处理
-                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
-                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
-                            chargingOrderClient.chargeMonitoring(query);
-                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
-    
-                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
-                        }
-                    });
+                    // 业务处理
+                    UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
+                    BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
+                    chargingOrderClient.chargeMonitoring(query);
+                    // 订单id
+                    ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
+                    chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
+                    // 推送充电订单信息
+                    ChargingMessage chargingMessage4 = new ChargingMessage();
+                    chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
+                    chargingMessage4.setOrderMessage(chargingOrderMessage3);
+                    enhanceProduce.orderInfoMessage(chargingMessage4);
+//                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+//                    threadPoolExecutor2.execute(new Runnable() {
+//                        @Override
+//                        public void run() {
+//                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
+//
+//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
+//                        }
+//                    });
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -306,14 +329,8 @@
                 MotorAbort motorAbort = new MotorAbort();
                 BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                 motorAbortService.create(motorAbort);
-                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor4.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // 业务处理
-                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
-                    }
-                });
+                // 业务处理
+                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                 break;
             case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                 BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
@@ -322,18 +339,13 @@
                 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                 BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                 bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
-                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor5.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // 业务处理
-                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
-                        if(Objects.nonNull(chargingOrderBms)){
-                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
-                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
-                        }
-                    }
-                });
+    
+                // 业务处理
+                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
+                if(Objects.nonNull(chargingOrderBms)){
+                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
+                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
+                }
                 break;
             case SendTagConstant.BMS_INFORMATION:
                 BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
@@ -358,16 +370,11 @@
                 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                 BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                 platformStartChargingReplyService.create(platformStartChargingReply);
-                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor6.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // 业务处理
-                        PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
-                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
-                        chargingOrderClient.startChargeSuccessfully(message1);
-                    }
-                });
+    
+                // 业务处理
+                PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
+                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
+                chargingOrderClient.startChargeSuccessfully(message1);
                 break;
             case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                 PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
@@ -376,15 +383,10 @@
                 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                 BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                 platformStopChargingReplyService.create(platformStopChargingReply);
-                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor7.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
-                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
-                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
-                    }
-                });
+    
+                PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
+                BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
+                chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                 break;
             case SendTagConstant.TRANSACTION_RECORD:
                 TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
@@ -394,26 +396,21 @@
                 TransactionRecord transactionRecord = new TransactionRecord();
                 BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                 transactionRecordService.create(transactionRecord);
-                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor8.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // 业务处理
-                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
-                        if(Objects.nonNull(chargingOrderRecord)){
-                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
-                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
-                        }
-                        //计算费用
-                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
-                        BeanUtils.copyProperties(transactionRecordMessage,vo);
-                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
-                        if(200 != code){
-                            //失败后添加到队列中继续处理数据
-                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
-                        }
-                    }
-                });
+    
+                // 业务处理
+                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
+                if(Objects.nonNull(chargingOrderRecord)){
+                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
+                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
+                }
+                //计算费用
+                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
+                BeanUtils.copyProperties(transactionRecordMessage,vo);
+                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
+                if(200 != code){
+                    //失败后添加到队列中继续处理数据
+                    redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
+                }
     
     
                 // 添加实时上传记录结束记录
@@ -511,15 +508,10 @@
                 SecurityDetection securityDetection = new SecurityDetection();
                 BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                 securityDetectionService.create(securityDetection);
-                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-                threadPoolExecutor9.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
-                        BeanUtils.copyProperties(securityDetection, securityDetection1);
-                        chargingOrderClient.securityDetection(securityDetection1);
-                    }
-                });
+    
+                SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
+                BeanUtils.copyProperties(securityDetection, securityDetection1);
+                chargingOrderClient.securityDetection(securityDetection1);
                 break;
             default:
                 PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();

--
Gitblit v1.7.1