From 64d91e3c3414b928254d36c6a2c9cb0dd7262dcf Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期四, 17 四月 2025 18:56:24 +0800
Subject: [PATCH] 删除消息队列代码改为同步接口处理

---
 ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java              |    4 
 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java      |  535 ++++++++++++++++++++++++++++++++++++++
 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java |  264 ++++++++++++------
 3 files changed, 705 insertions(+), 98 deletions(-)

diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
index c122e98..53b3c40 100644
--- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
+++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -15,6 +15,7 @@
 import com.ruoyi.integration.iotda.utils.tools.MessageUtil;
 import com.ruoyi.integration.iotda.utils.tools.StrategyUtil;
 import com.ruoyi.integration.rocket.model.*;
+import com.ruoyi.integration.rocket.produce.ChargingMessageUtil;
 import com.ruoyi.integration.rocket.produce.EnhanceProduce;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
@@ -47,6 +48,13 @@
     private IotMessageProduce iotMessageProduce;
     @Resource
     private AccountingStrategyDetailClient accountingStrategyDetailClient;
+    
+    @Resource
+    private ChargingMessageUtil chargingMessageUtil;
+    
+    
+    
+    
     /**
      * 设备消息监听
      * @param jsonObject
@@ -77,245 +85,309 @@
             case SendTagConstant.ONLINE:
                 OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class);
                 chargingMessage.setOnlineMessage(onlineMessage);
-                // 响应硬件
-                // 业务处理 登录认证应答
-                OnlineReply onlineReply = new OnlineReply();
-                onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code());
-                onlineReply.setOnline_result(0);
-                result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply));
-                log.info("充电桩登录认证-返回结果:{}",result);
-                // 响应硬件 对时设置应答
-                TimingSettingReply timingSettingReplyOnline = new TimingSettingReply();
-                timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code());
-                timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date()));
-                iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 响应硬件
+                        // 业务处理 登录认证应答
+                        OnlineReply onlineReply = new OnlineReply();
+                        onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code());
+                        onlineReply.setOnline_result(0);
+                        String result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply));
+                        log.info("充电桩登录认证-返回结果:{}",result);
+                        // 响应硬件 对时设置应答
+                        TimingSettingReply timingSettingReplyOnline = new TimingSettingReply();
+                        timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code());
+                        timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date()));
+                        iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.PING:
                 PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class);
                 chargingMessage.setPingMessage(pingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件
-                Pong pong = new Pong();
-                pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
-                pong.setCharging_gun_code(pingMessage.getCharging_gun_code());
-                pong.setCharging_gun_status(0);
-                iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        Pong pong = new Pong();
+                        pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
+                        pong.setCharging_gun_code(pingMessage.getCharging_gun_code());
+                        pong.setCharging_gun_status(0);
+                        iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.END_CHARGE:
                 EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class);
                 chargingMessage.setEndChargeMessage(endChargeMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.ERROR_MESSAGE:
                 ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class);
                 chargingMessage.setErrorMessageMessage(errorMessageMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.BILLING_MODE_VERIFY:
                 BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class);
                 chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件
-                BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply();
-                if(billingModeVerifyMessage.getBilling_model_code().equals("0")){
-                    // 首次
-                    billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code());
-                    billingModeVerifyReply.setBilling_model_code("0");
-                    billingModeVerifyReply.setBilling_model_result(1);
-                }else {
-                    // 查询桩使用的模版
-                    CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO();
-                    dto.setCode(billingModeVerifyMessage.getCharging_pile_code());
-                    dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code()));
-                    Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData();
-                    // 校验计费模版是否准确
-                    billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code());
-                    billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code());
-                    if(check){
-                        billingModeVerifyReply.setBilling_model_result(0);
-                    }else {
-                        billingModeVerifyReply.setBilling_model_result(1);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply();
+                        if(billingModeVerifyMessage.getBilling_model_code().equals("0")){
+                            // 首次
+                            billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code());
+                            billingModeVerifyReply.setBilling_model_code("0");
+                            billingModeVerifyReply.setBilling_model_result(1);
+                        }else {
+                            // 查询桩使用的模版
+                            CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO();
+                            dto.setCode(billingModeVerifyMessage.getCharging_pile_code());
+                            dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code()));
+                            Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData();
+                            // 校验计费模版是否准确
+                            billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code());
+                            billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code());
+                            if(check){
+                                billingModeVerifyReply.setBilling_model_result(0);
+                            }else {
+                                billingModeVerifyReply.setBilling_model_result(1);
+                            }
+                        }
+                        iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply));
                     }
-                }
-                iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.ACQUISITION_BILLING_MODE:
                 AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class);
                 chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件   计费模型请求应答  1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段
-                List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData();
-                Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails);
-                // 价格设置
-                AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply();
-                StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply);
-                // 时段设置
-                StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply);
-                TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData();
-                acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString());
-                acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code());
-                acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO);
-                iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData();
+                        Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails);
+                        // 价格设置
+                        AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply();
+                        StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply);
+                        // 时段设置
+                        StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply);
+                        TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData();
+                        acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString());
+                        acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code());
+                        acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO);
+                        iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                 log.info("充电实时数据上传");
                 UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class);
                 chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.CHARGING_HANDSHAKE:
                 ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class);
                 chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.PARAMETER_SETTING:
                 ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class);
                 chargingMessage.setParameterSettingMessage(parameterSettingMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.BMS_ABORT:
                 BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class);
                 chargingMessage.setBmsAbortMessage(bmsAbortMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.MOTOR_ABORT:
                 MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class);
                 chargingMessage.setMotorAbortMessage(motorAbortMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                 BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class);
                 chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.BMS_INFORMATION:
                 BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class);
                 chargingMessage.setBmsInformationMessage(bmsInformationMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                 ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class);
                 chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件
-                PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging();
-                platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code());
-                platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code());
-                platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount());
-                platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO);
-                platformConfirmationCharging.setAuthentication(1);
-                // TODO 若是失败,给出失败原因
-                iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging();
+                        platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code());
+                        platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code());
+                        platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount());
+                        platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO);
+                        platformConfirmationCharging.setAuthentication(1);
+                        // TODO 若是失败,给出失败原因
+                        iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                 PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class);
                 chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                 PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class);
                 chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.TRANSACTION_RECORD:
                 TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class);
                 transactionRecordMessage.setResult(content.toJSONString());
                 chargingMessage.setTransactionRecordMessage(transactionRecordMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件
-                ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord();
-                confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number());
-                confirmTransactionRecord.setConfirm_result(0);
-                iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord();
+                        confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number());
+                        confirmTransactionRecord.setConfirm_result(0);
+                        iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.UPDATE_BALANCE_REPLY:
                 UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class);
                 chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                 SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class);
                 chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                 ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class);
                 chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                 WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class);
                 chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.TIMING_SETTING:
                 TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class);
                 chargingMessage.setTimingSettingMessage(timingSettingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
                 // 响应硬件 对时设置应答
-                TimingSettingReply timingSettingReply = new TimingSettingReply();
-                timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code());
-                timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date()));
-                iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply));
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        TimingSettingReply timingSettingReply = new TimingSettingReply();
+                        timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code());
+                        timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date()));
+                        iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply));
+                    }
+                }).start();
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 break;
             case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                 SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class);
                 chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                 GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class);
                 chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                 ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class);
                 chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.PLATFORM_RESTART_REPLY:
                 PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class);
                 chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                 QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class);
                 chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             case SendTagConstant.SECURITY_DETECTION:
                 SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class);
                 chargingMessage.setSecurityDetectionMessage(securityDetectionMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
             default:
                 PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class);
                 chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
-                sendResult = enhanceProduce.chargingMessage(chargingMessage);
+                chargingMessageUtil.handleMessage(chargingMessage);
+//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                 // 响应硬件
                 break;
         }
-        log.info("rocketmq消息下发结果:{}",sendResult);
+//        log.info("rocketmq消息下发结果:{}",sendResult);
         return AjaxResult.success();
     }
 
diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
new file mode 100644
index 0000000..60bd13f
--- /dev/null
+++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
@@ -0,0 +1,535 @@
+package com.ruoyi.integration.rocket.produce;
+
+import com.alibaba.fastjson.JSONObject;
+import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
+import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
+import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
+import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
+import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
+import com.ruoyi.integration.api.model.*;
+import com.ruoyi.integration.drainage.TCECPushUtil;
+import com.ruoyi.integration.iotda.constant.SendTagConstant;
+import com.ruoyi.integration.mongodb.service.*;
+import com.ruoyi.integration.rocket.model.*;
+import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
+import com.ruoyi.order.api.feignClient.ChargingOrderClient;
+import com.ruoyi.order.api.model.TChargingOrder;
+import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
+import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
+import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
+import com.ruoyi.order.api.vo.SecurityDetectionVO;
+import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.Resource;
+import java.math.RoundingMode;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+@Slf4j
+@Component
+public class ChargingMessageUtil {
+
+    @Autowired
+    private AcquisitionBillingModeService acquisitionBillingModeService;
+    @Autowired
+    private BillingModeVerifyService billingModeVerifyService;
+    @Autowired
+    private BmsAbortService bmsAbortService;
+    @Resource
+    private ChargingOrderClient chargingOrderClient;
+    @Autowired
+    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
+    @Autowired
+    private OnlineService onlineService;
+    @Autowired
+    private PingService pingService;
+    @Autowired
+    private EndChargeService endChargeService;
+    @Autowired
+    private ErrorMessageMessageService errorMessageMessageService;
+    @Autowired
+    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
+    @Resource
+    private AccountingStrategyDetailClient accountingStrategyDetailClient;
+    @Autowired
+    private ChargingHandshakeService chargingHandshakeService;
+    @Autowired
+    private ParameterSettingService parameterSettingService;
+    @Autowired
+    private MotorAbortService motorAbortService;
+    @Autowired
+    private BmsInformationService bmsInformationService;
+    @Autowired
+    private ChargingPileStartsChargingService chargingPileStartsChargingService;
+    @Autowired
+    private PlatformStartChargingReplyService platformStartChargingReplyService;
+    @Autowired
+    private PlatformStopChargingReplyService platformStopChargingReplyService;
+    @Autowired
+    private TransactionRecordService transactionRecordService;
+    @Autowired
+    private UpdateBalanceReplyService updateBalanceReplyService;
+    @Autowired
+    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
+    @Autowired
+    private ClearOfflineCardReplyService clearOfflineCardReplyService;
+    @Autowired
+    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
+    @Autowired
+    private TimingSettingService timingSettingService;
+    @Autowired
+    private SetupBillingModelReplyService setupBillingModelReplyService;
+    @Autowired
+    private GroundLockRealTimeDataService groundLockRealTimeDataService;
+    @Autowired
+    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
+    @Autowired
+    private PlatformRestartReplyService platformRestartReplyService;
+    @Autowired
+    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
+    @Autowired
+    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
+    @Autowired
+    private SecurityDetectionService securityDetectionService;
+    @Autowired
+    private TCECPushUtil tcecPushUtil;
+
+    @Resource
+    private ChargingPileClient chargingPileClient;
+    @Resource
+    private ChargingGunClient chargingGunClient;
+
+    @Resource
+    private RedisTemplate redisTemplate;
+    
+    
+    
+    
+    
+
+    public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){
+        log.info("rocket收到的消息内容:{}",message);
+        String serviceId = message.getServiceId();
+        if(!StringUtils.hasLength(serviceId)){
+            return;
+        }
+        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
+        switch (serviceId){
+            case SendTagConstant.ONLINE:
+                OnlineMessage onlineMessage = message.getOnlineMessage();
+                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
+                // 持久化消息
+                Online online = new Online();
+                BeanUtils.copyProperties(onlineMessage,online);
+                onlineService.create(online);
+                break;
+            case SendTagConstant.PING:
+                PingMessage pingMessage = message.getPingMessage();
+                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
+                // 持久化消息
+                Ping ping = new Ping();
+                BeanUtils.copyProperties(pingMessage,ping);
+                pingService.save(ping);
+                //存储缓存中,5分钟有效
+                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
+                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
+                        vo1.setGun_code(pingMessage.getCharging_gun_code());
+                        vo1.setPile_code(pingMessage.getCharging_pile_code());
+                        vo1.setStatus(pingMessage.getCharging_gun_status());
+                        chargingPileClient.updateChargingPileStatus(vo1);
+    
+                        try {
+                            tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData());
+                        }catch (Exception e){
+                            e.printStackTrace();
+                            System.out.println("设备状态推送监管平台失败:"+e.getMessage());
+                        }
+                    }
+                });
+                break;
+            case SendTagConstant.END_CHARGE:
+                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
+                log.info("充电结束-业务消息处理:{}",endChargeMessage);
+                // 持久化消息
+                EndCharge endCharge = new EndCharge();
+                BeanUtils.copyProperties(endChargeMessage,endCharge);
+                endChargeService.create(endCharge);
+                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor1.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
+                        try {
+                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
+                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
+                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
+                        }catch (Exception e){
+                            e.printStackTrace();
+                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
+                        }
+                    }
+                });
+                break;
+            case SendTagConstant.ERROR_MESSAGE:
+                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
+                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
+                // 持久化消息
+                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
+                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
+                errorMessageMessageService.create(errorMessageMessage);
+                break;
+            case SendTagConstant.BILLING_MODE_VERIFY:
+                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
+                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
+                // 持久化消息
+                BillingModeVerify billingModeVerify = new BillingModeVerify();
+                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
+                billingModeVerifyService.create(billingModeVerify);
+                break;
+            case SendTagConstant.ACQUISITION_BILLING_MODE:
+                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
+                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
+                // 持久化消息
+                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
+                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
+                acquisitionBillingModeService.create(acquisitionBillingMode);
+                break;
+            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
+                try {
+                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
+                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
+                    // 持久化消息
+                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
+                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
+                    // 查询mogondb上一条数据
+                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
+                    // 查询订单
+                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
+                    // 查询当前时间段的计费策略
+                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
+                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
+                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
+                    if (Objects.nonNull(data)) {
+                        uploadRealTimeMonitoringDataService.updateById(data.getId());
+                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
+                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
+                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
+                    }else {
+                        log.info("首次上传实时监测数据");
+                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
+                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
+                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
+                    }
+                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
+                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
+                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
+//                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
+//                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
+                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
+                    if(i == 0){
+                        log.error("数据存储mongo失败");
+                    }
+    
+                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                    threadPoolExecutor2.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            // 业务处理
+                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
+                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
+                            chargingOrderClient.chargeMonitoring(query);
+                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
+    
+                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
+                        }
+                    });
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                break;
+            case SendTagConstant.CHARGING_HANDSHAKE:
+                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
+                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
+                // 持久化消息
+                ChargingHandshake chargingHandshake = new ChargingHandshake();
+                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
+                chargingHandshakeService.create(chargingHandshake);
+                break;
+            case SendTagConstant.PARAMETER_SETTING:
+                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
+                log.info("业务消息处理:{}",parameterSettingMessage);
+                // 持久化消息
+                ParameterSetting parameterSetting = new ParameterSetting();
+                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
+                parameterSettingService.create(parameterSetting);
+                break;
+            case SendTagConstant.BMS_ABORT:
+                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
+                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
+                // 持久化消息
+                BmsAbort bmsAbort = new BmsAbort();
+                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
+                bmsAbortService.create(bmsAbort);
+    
+                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor3.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
+                    }
+                });
+                break;
+            case SendTagConstant.MOTOR_ABORT:
+                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
+                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
+                // 持久化消息
+                MotorAbort motorAbort = new MotorAbort();
+                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
+                motorAbortService.create(motorAbort);
+                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor4.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
+                    }
+                });
+                break;
+            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
+                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
+                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
+                // 持久化消息
+                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
+                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
+                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
+                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor5.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
+                        if(Objects.nonNull(chargingOrderBms)){
+                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
+                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
+                        }
+                    }
+                });
+                break;
+            case SendTagConstant.BMS_INFORMATION:
+                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
+                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
+                // 持久化消息
+                BmsInformation bmsInformation = new BmsInformation();
+                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
+                bmsInformationService.create(bmsInformation);
+                break;
+            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
+                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
+                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
+                // 持久化消息
+                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
+                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
+                chargingPileStartsChargingService.create(chargingPileStartsCharging);
+                break;
+            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
+                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
+                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
+                // 持久化消息
+                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
+                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
+                platformStartChargingReplyService.create(platformStartChargingReply);
+                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor6.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
+                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
+                        chargingOrderClient.startChargeSuccessfully(message1);
+                    }
+                });
+                break;
+            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
+                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
+                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
+                // 持久化消息
+                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
+                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
+                platformStopChargingReplyService.create(platformStopChargingReply);
+                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor7.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
+                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
+                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
+                    }
+                });
+                break;
+            case SendTagConstant.TRANSACTION_RECORD:
+                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
+                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
+                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
+                // 持久化消息
+                TransactionRecord transactionRecord = new TransactionRecord();
+                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
+                transactionRecordService.create(transactionRecord);
+                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor8.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        // 业务处理
+                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
+                        if(Objects.nonNull(chargingOrderRecord)){
+                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
+                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
+                        }
+                        //计算费用
+                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
+                        BeanUtils.copyProperties(transactionRecordMessage,vo);
+                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
+                        if(200 != code){
+                            //失败后添加到队列中继续处理数据
+                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
+                        }
+                    }
+                });
+    
+    
+                // 添加实时上传记录结束记录
+                // 查询mogondb上一条数据
+                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
+                if(Objects.nonNull(data) && data.getStatus() != 5){
+                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
+                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
+                    uploadRealTimeMonitoringData.setStatus(5);
+                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
+                }
+                break;
+            case SendTagConstant.UPDATE_BALANCE_REPLY:
+                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
+                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
+                // 持久化消息
+                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
+                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
+                updateBalanceReplyService.create(updateBalanceReply);
+                break;
+            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
+                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
+                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
+                // 持久化消息
+                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
+                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
+                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
+                break;
+            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
+                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
+                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
+                // 持久化消息
+                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
+                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
+                clearOfflineCardReplyService.create(clearOfflineCardReply);
+                break;
+            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
+                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
+                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
+                // 持久化消息
+                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
+                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
+                workingParameterSettingReplyService.create(workingParameterSettingReply);
+                break;
+            case SendTagConstant.TIMING_SETTING:
+                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
+                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
+                // 持久化消息
+                TimingSetting timingSetting = new TimingSetting();
+                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
+                timingSettingService.create(timingSetting);
+                break;
+            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
+                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
+                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
+                // 持久化消息
+                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
+                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
+                setupBillingModelReplyService.create(setupBillingModelReply);
+                break;
+            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
+                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
+                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
+                // 持久化消息
+                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
+                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
+                groundLockRealTimeDataService.create(groundLockRealTimeData);
+                break;
+            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
+                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
+                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
+                // 持久化消息
+                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
+                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
+                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
+                break;
+            case SendTagConstant.PLATFORM_RESTART_REPLY:
+                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
+                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
+                // 持久化消息
+                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
+                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
+                platformRestartReplyService.create(platformRestartReply);
+                break;
+            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
+                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
+                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
+                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
+                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
+                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
+                break;
+            case SendTagConstant.SECURITY_DETECTION:
+                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
+                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
+                SecurityDetection securityDetection = new SecurityDetection();
+                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
+                securityDetectionService.create(securityDetection);
+                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+                threadPoolExecutor9.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
+                        BeanUtils.copyProperties(securityDetection, securityDetection1);
+                        chargingOrderClient.securityDetection(securityDetection1);
+                    }
+                });
+                break;
+            default:
+                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
+                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
+                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
+                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
+                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
+                break;
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
index 282599d..09cb9c0 100644
--- a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
+++ b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
@@ -731,7 +731,7 @@
 			Integer counter = counter_map.get(code);
 			PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key);
 			//5分钟内还未插枪则取消充电,退回金额。
-			if(failure_cause == 5 && (null == counter || counter < 300)){
+			if(failure_cause == 5 && (null == counter || counter < 1800)){
 				counter = (null == counter ? 0 : counter) + 1;
 				counter_map.put(code, counter);
 				//启动失败
@@ -772,7 +772,7 @@
 			log.info(code + ":-------------------未上传开启充电结果-------------------" + counter);
 			PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key);
 			//5分钟内未启动成功,退回金额。
-			if(null == counter || counter < 300){
+			if(null == counter || counter < 1800){
 				counter = (null == counter ? 0 : counter) + 1;
 				boot_failed_map.put(code, counter);
 				//启动失败

--
Gitblit v1.7.1