xuhy
2025-05-06 a561068a61bd142318913f2e12e58d44542d9c7e
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -1,628 +1,625 @@
package com.ruoyi.integration.rocket.produce;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient;
import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
import com.ruoyi.chargingPile.api.model.TChargingGun;
import com.ruoyi.chargingPile.api.model.TFaultMessage;
import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode;
import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
import com.ruoyi.common.redis.service.RedisService;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.drainage.TCECPushUtil;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter;
import com.ruoyi.integration.mongodb.service.*;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
import com.ruoyi.order.api.vo.SecurityDetectionVO;
import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_message",
        topic = "charge_charging_message",
        selectorExpression = "charging_message",
        consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private PingService pingService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @Autowired
    private TCECPushUtil tcecPushUtil;
    @Resource
    private ChargingPileClient chargingPileClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private EnhanceProduce enhanceProduce;
    @StreamListener("input")
    @Override
    protected void handleMessage(ChargingMessage message) throws Exception {
        log.info("rocket收到的消息内容:{}",message);
        String serviceId = message.getServiceId();
        if(!StringUtils.hasLength(serviceId)){
            return;
        }
        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
        switch (serviceId){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = message.getOnlineMessage();
                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
                // 持久化消息
                Online online = new Online();
                BeanUtils.copyProperties(onlineMessage,online);
                onlineService.create(online);
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = message.getPingMessage();
                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
                // 持久化消息
                Ping ping = new Ping();
                BeanUtils.copyProperties(pingMessage,ping);
                pingService.save(ping);
                //存储缓存中,5分钟有效
                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
                        vo1.setGun_code(pingMessage.getCharging_gun_code());
                        vo1.setPile_code(pingMessage.getCharging_pile_code());
                        vo1.setStatus(pingMessage.getCharging_gun_status());
                        chargingPileClient.updateChargingPileStatus(vo1);
                    }
                });
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
                log.info("充电结束-业务消息处理:{}",endChargeMessage);
                // 持久化消息
                EndCharge endCharge = new EndCharge();
                BeanUtils.copyProperties(endChargeMessage,endCharge);
                endChargeService.create(endCharge);
                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor1.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                        // 订单id
                        String transactionSerialNumber = endCharge.getTransaction_serial_number();
                        ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
                        chargingOrderMessage.setOrderNumber(transactionSerialNumber);
                        // 推送充电订单信息
                        ChargingMessage chargingMessage1 = new ChargingMessage();
                        chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
                        chargingMessage1.setOrderMessage(chargingOrderMessage);
                        enhanceProduce.orderInfoMessage(chargingMessage1);
                        // 推送充电订单状态
                        ChargingMessage chargingMessage2 = new ChargingMessage();
                        chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
                        chargingMessage2.setOrderMessage(chargingOrderMessage);
                        enhanceProduce.orderStatusMessage(chargingMessage2);
//                        try {
//                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
//                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }catch (Exception e){
//                            e.printStackTrace();
//                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
//package com.ruoyi.integration.rocket.produce;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
//import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient;
//import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
//import com.ruoyi.chargingPile.api.model.TChargingGun;
//import com.ruoyi.chargingPile.api.model.TFaultMessage;
//import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode;
//import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
//import com.ruoyi.common.redis.service.RedisService;
//import com.ruoyi.integration.api.model.*;
//import com.ruoyi.integration.drainage.TCECPushUtil;
//import com.ruoyi.integration.iotda.constant.SendTagConstant;
//import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
//import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter;
//import com.ruoyi.integration.mongodb.service.*;
//import com.ruoyi.integration.rocket.model.*;
//import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
//import com.ruoyi.order.api.feignClient.ChargingOrderClient;
//import com.ruoyi.order.api.model.TChargingOrder;
//import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
//import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
//import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
//import com.ruoyi.order.api.vo.SecurityDetectionVO;
//import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.rocketmq.spring.annotation.MessageModel;
//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
//import org.apache.rocketmq.spring.core.RocketMQListener;
//import org.springframework.beans.BeanUtils;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import javax.annotation.Resource;
//import java.math.RoundingMode;
//import java.time.LocalDateTime;
//import java.util.Date;
//import java.util.Objects;
//import java.util.Set;
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
//@Slf4j
//@Component
//@RocketMQMessageListener(
//        messageModel = MessageModel.CLUSTERING,
//        consumerGroup = "charge_charging_message",
//        topic = "charge_charging_message",
//        selectorExpression = "charging_message",
//        consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
//)
//public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> {
//
//    @Autowired
//    private AcquisitionBillingModeService acquisitionBillingModeService;
//    @Autowired
//    private BillingModeVerifyService billingModeVerifyService;
//    @Autowired
//    private BmsAbortService bmsAbortService;
//    @Resource
//    private ChargingOrderClient chargingOrderClient;
//    @Autowired
//    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
//    @Autowired
//    private OnlineService onlineService;
//    @Autowired
//    private PingService pingService;
//    @Autowired
//    private EndChargeService endChargeService;
//    @Autowired
//    private ErrorMessageMessageService errorMessageMessageService;
//    @Autowired
//    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
//    @Resource
//    private AccountingStrategyDetailClient accountingStrategyDetailClient;
//    @Autowired
//    private ChargingHandshakeService chargingHandshakeService;
//    @Autowired
//    private ParameterSettingService parameterSettingService;
//    @Autowired
//    private MotorAbortService motorAbortService;
//    @Autowired
//    private BmsInformationService bmsInformationService;
//    @Autowired
//    private ChargingPileStartsChargingService chargingPileStartsChargingService;
//    @Autowired
//    private PlatformStartChargingReplyService platformStartChargingReplyService;
//    @Autowired
//    private PlatformStopChargingReplyService platformStopChargingReplyService;
//    @Autowired
//    private TransactionRecordService transactionRecordService;
//    @Autowired
//    private UpdateBalanceReplyService updateBalanceReplyService;
//    @Autowired
//    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
//    @Autowired
//    private ClearOfflineCardReplyService clearOfflineCardReplyService;
//    @Autowired
//    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
//    @Autowired
//    private TimingSettingService timingSettingService;
//    @Autowired
//    private SetupBillingModelReplyService setupBillingModelReplyService;
//    @Autowired
//    private GroundLockRealTimeDataService groundLockRealTimeDataService;
//    @Autowired
//    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
//    @Autowired
//    private PlatformRestartReplyService platformRestartReplyService;
//    @Autowired
//    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
//    @Autowired
//    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
//    @Autowired
//    private SecurityDetectionService securityDetectionService;
//    @Autowired
//    private TCECPushUtil tcecPushUtil;
//
//    @Resource
//    private ChargingPileClient chargingPileClient;
//    @Resource
//    private ChargingGunClient chargingGunClient;
//
//    @Resource
//    private RedisTemplate redisTemplate;
//
//    @Autowired
//    private EnhanceProduce enhanceProduce;
//
//
//
//    @Override
//    protected void handleMessage(ChargingMessage message) throws Exception {
//        log.info("rocket收到的消息内容:{}",message);
//        String serviceId = message.getServiceId();
//        if(!StringUtils.hasLength(serviceId)){
//            return;
//        }
//        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
//        switch (serviceId){
//            case SendTagConstant.ONLINE:
//                OnlineMessage onlineMessage = message.getOnlineMessage();
//                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
//                // 持久化消息
//                Online online = new Online();
//                BeanUtils.copyProperties(onlineMessage,online);
//                onlineService.create(online);
//                break;
//            case SendTagConstant.PING:
//                PingMessage pingMessage = message.getPingMessage();
//                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
//                // 持久化消息
//                Ping ping = new Ping();
//                BeanUtils.copyProperties(pingMessage,ping);
//                pingService.save(ping);
//                //存储缓存中,5分钟有效
//                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
//                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
//                        vo1.setGun_code(pingMessage.getCharging_gun_code());
//                        vo1.setPile_code(pingMessage.getCharging_pile_code());
//                        vo1.setStatus(pingMessage.getCharging_gun_status());
//                        chargingPileClient.updateChargingPileStatus(vo1);
//                    }
//                });
//                break;
//            case SendTagConstant.END_CHARGE:
//                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
//                log.info("充电结束-业务消息处理:{}",endChargeMessage);
//                // 持久化消息
//                EndCharge endCharge = new EndCharge();
//                BeanUtils.copyProperties(endChargeMessage,endCharge);
//                endChargeService.create(endCharge);
//                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor1.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
//                        // 订单id
//                        String transactionSerialNumber = endCharge.getTransaction_serial_number();
//                        ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
//                        chargingOrderMessage.setOrderNumber(transactionSerialNumber);
//                        // 推送充电订单信息
//                        ChargingMessage chargingMessage1 = new ChargingMessage();
//                        chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
//                        chargingMessage1.setOrderMessage(chargingOrderMessage);
//                        enhanceProduce.orderInfoMessage(chargingMessage1);
//                        // 推送充电订单状态
//                        ChargingMessage chargingMessage2 = new ChargingMessage();
//                        chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
//                        chargingMessage2.setOrderMessage(chargingOrderMessage);
//                        enhanceProduce.orderStatusMessage(chargingMessage2);
////                        try {
////                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
////                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
////                        }catch (Exception e){
////                            e.printStackTrace();
////                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
////                        }
//                    }
//                });
//                break;
//            case SendTagConstant.ERROR_MESSAGE:
//                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
//                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
//                // 持久化消息
//                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
//                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
//                errorMessageMessageService.create(errorMessageMessage);
//                break;
//            case SendTagConstant.BILLING_MODE_VERIFY:
//                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
//                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
//                // 持久化消息
//                BillingModeVerify billingModeVerify = new BillingModeVerify();
//                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
//                billingModeVerifyService.create(billingModeVerify);
//                break;
//            case SendTagConstant.ACQUISITION_BILLING_MODE:
//                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
//                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
//                // 持久化消息
//                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
//                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
//                acquisitionBillingModeService.create(acquisitionBillingMode);
//                break;
//            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
//                try {
//                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
//                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
//                    // 持久化消息
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
//                    // 查询mogondb上一条数据
//                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
//                    // 查询订单
//                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
//                    // 查询当前时间段的计费策略
//                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
//                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
//                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
//                    if (Objects.nonNull(data)) {
//                        uploadRealTimeMonitoringDataService.updateById(data.getId());
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }else {
//                        log.info("首次上传实时监测数据");
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }
//                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
//                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
//                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
////                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
////                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
//                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                    if(i == 0){
//                        log.error("数据存储mongo失败");
//                    }
//
//                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                    threadPoolExecutor2.execute(new Runnable() {
//                        @Override
//                        public void run() {
//                            // 业务处理
//                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
//                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
//                            chargingOrderClient.chargeMonitoring(query);
//                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
//                            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
//                            chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
//                            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
//                            // 推送充电订单信息
//                            ChargingMessage chargingMessage4 = new ChargingMessage();
//                            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
//                            chargingMessage4.setOrderMessage(chargingOrderMessage3);
//                            enhanceProduce.orderInfoMessage(chargingMessage4);
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }
                    }
                });
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
                // 持久化消息
                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
                errorMessageMessageService.create(errorMessageMessage);
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
                // 持久化消息
                BillingModeVerify billingModeVerify = new BillingModeVerify();
                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
                billingModeVerifyService.create(billingModeVerify);
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
                // 持久化消息
                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
                acquisitionBillingModeService.create(acquisitionBillingMode);
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                try {
                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
                    // 持久化消息
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
                    // 查询mogondb上一条数据
                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
                    // 查询订单
                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
                    // 查询当前时间段的计费策略
                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
                    if (Objects.nonNull(data)) {
                        uploadRealTimeMonitoringDataService.updateById(data.getId());
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }else {
                        log.info("首次上传实时监测数据");
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }
                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
//                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
//                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                    if(i == 0){
                        log.error("数据存储mongo失败");
                    }
                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                    threadPoolExecutor2.execute(new Runnable() {
                        @Override
                        public void run() {
                            // 业务处理
                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                            chargingOrderClient.chargeMonitoring(query);
                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
                            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
                            chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
                            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
                            // 推送充电订单信息
                            ChargingMessage chargingMessage4 = new ChargingMessage();
                            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
                            chargingMessage4.setOrderMessage(chargingOrderMessage3);
                            enhanceProduce.orderInfoMessage(chargingMessage4);
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
                // 持久化消息
                ChargingHandshake chargingHandshake = new ChargingHandshake();
                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
                chargingHandshakeService.create(chargingHandshake);
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
                log.info("业务消息处理:{}",parameterSettingMessage);
                // 持久化消息
                ParameterSetting parameterSetting = new ParameterSetting();
                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
                parameterSettingService.create(parameterSetting);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
                // 持久化消息
                BmsAbort bmsAbort = new BmsAbort();
                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                bmsAbortService.create(bmsAbort);
                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor3.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                    }
                });
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
                // 持久化消息
                MotorAbort motorAbort = new MotorAbort();
                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                motorAbortService.create(motorAbort);
                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor4.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                    }
                });
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
                // 持久化消息
                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor5.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderBms)){
                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
                        }
                    }
                });
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
                // 持久化消息
                BmsInformation bmsInformation = new BmsInformation();
                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
                bmsInformationService.create(bmsInformation);
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
                // 持久化消息
                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
                chargingPileStartsChargingService.create(chargingPileStartsCharging);
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
                // 持久化消息
                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                platformStartChargingReplyService.create(platformStartChargingReply);
                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor6.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                        chargingOrderClient.startChargeSuccessfully(message1);
                    }
                });
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
                // 持久化消息
                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                platformStopChargingReplyService.create(platformStopChargingReply);
                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor7.execute(new Runnable() {
                    @Override
                    public void run() {
                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                    }
                });
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
                // 持久化消息
                TransactionRecord transactionRecord = new TransactionRecord();
                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                transactionRecordService.create(transactionRecord);
                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor8.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderRecord)){
                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                        }
                        //计算费用
                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                        BeanUtils.copyProperties(transactionRecordMessage,vo);
                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                        if(200 != code){
                            //失败后添加到队列中继续处理数据
                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
                        }
                    }
                });
                // 添加实时上传记录结束记录
                // 查询mogondb上一条数据
                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
                if(Objects.nonNull(data) && data.getStatus() != 5){
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
                    uploadRealTimeMonitoringData.setStatus(5);
                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                }
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
                // 持久化消息
                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
                updateBalanceReplyService.create(updateBalanceReply);
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
                // 持久化消息
                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
                // 持久化消息
                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
                clearOfflineCardReplyService.create(clearOfflineCardReply);
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
                // 持久化消息
                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
                workingParameterSettingReplyService.create(workingParameterSettingReply);
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
                // 持久化消息
                TimingSetting timingSetting = new TimingSetting();
                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
                timingSettingService.create(timingSetting);
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
                // 持久化消息
                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
                setupBillingModelReplyService.create(setupBillingModelReply);
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
                // 持久化消息
                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
                groundLockRealTimeDataService.create(groundLockRealTimeData);
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
                // 持久化消息
                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
                // 持久化消息
                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
                platformRestartReplyService.create(platformRestartReply);
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
                SecurityDetection securityDetection = new SecurityDetection();
                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                securityDetectionService.create(securityDetection);
                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor9.execute(new Runnable() {
                    @Override
                    public void run() {
                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
                        BeanUtils.copyProperties(securityDetection, securityDetection1);
                        chargingOrderClient.securityDetection(securityDetection1);
                    }
                });
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
                break;
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingMessage message) {
        super.dispatchMessage(message);
    }
    /**
     * 处理未正常完成费用计算的订单
     */
    public void transactionRecord(){
        Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD);
        for (String member : members) {
            TransactionRecord transactionRecord = transactionRecordService.findOne(member);
            if(null == transactionRecord){
                redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
            }else{
                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                BeanUtils.copyProperties(transactionRecord, vo);
                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                if(200 == code){
                    redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
                }
            }
        }
    }
}
//                    });
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//                break;
//            case SendTagConstant.CHARGING_HANDSHAKE:
//                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
//                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
//                // 持久化消息
//                ChargingHandshake chargingHandshake = new ChargingHandshake();
//                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
//                chargingHandshakeService.create(chargingHandshake);
//                break;
//            case SendTagConstant.PARAMETER_SETTING:
//                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
//                log.info("业务消息处理:{}",parameterSettingMessage);
//                // 持久化消息
//                ParameterSetting parameterSetting = new ParameterSetting();
//                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
//                parameterSettingService.create(parameterSetting);
//                break;
//            case SendTagConstant.BMS_ABORT:
//                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
//                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
//                // 持久化消息
//                BmsAbort bmsAbort = new BmsAbort();
//                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
//                bmsAbortService.create(bmsAbort);
//
//                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor3.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
//                    }
//                });
//                break;
//            case SendTagConstant.MOTOR_ABORT:
//                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
//                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
//                // 持久化消息
//                MotorAbort motorAbort = new MotorAbort();
//                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
//                motorAbortService.create(motorAbort);
//                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor4.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
//                    }
//                });
//                break;
//            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
//                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
//                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
//                // 持久化消息
//                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
//                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
//                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
//                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor5.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
//                        if(Objects.nonNull(chargingOrderBms)){
//                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
//                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
//                        }
//                    }
//                });
//                break;
//            case SendTagConstant.BMS_INFORMATION:
//                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
//                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
//                // 持久化消息
//                BmsInformation bmsInformation = new BmsInformation();
//                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
//                bmsInformationService.create(bmsInformation);
//                break;
//            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
//                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
//                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
//                // 持久化消息
//                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
//                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
//                chargingPileStartsChargingService.create(chargingPileStartsCharging);
//                break;
//            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
//                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
//                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
//                // 持久化消息
//                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
//                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
//                platformStartChargingReplyService.create(platformStartChargingReply);
//                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor6.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
//                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
//                        chargingOrderClient.startChargeSuccessfully(message1);
//                    }
//                });
//                break;
//            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
//                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
//                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
//                // 持久化消息
//                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
//                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
//                platformStopChargingReplyService.create(platformStopChargingReply);
//                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor7.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
//                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
//                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
//                    }
//                });
//                break;
//            case SendTagConstant.TRANSACTION_RECORD:
//                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
//                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
//                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
//                // 持久化消息
//                TransactionRecord transactionRecord = new TransactionRecord();
//                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
//                transactionRecordService.create(transactionRecord);
//                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor8.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
//                        if(Objects.nonNull(chargingOrderRecord)){
//                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
//                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
//                        }
//                        //计算费用
//                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
//                        BeanUtils.copyProperties(transactionRecordMessage,vo);
//                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
//                        if(200 != code){
//                            //失败后添加到队列中继续处理数据
//                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
//                        }
//                    }
//                });
//
//
//                // 添加实时上传记录结束记录
//                // 查询mogondb上一条数据
//                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
//                if(Objects.nonNull(data) && data.getStatus() != 5){
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
//                    uploadRealTimeMonitoringData.setStatus(5);
//                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                }
//                break;
//            case SendTagConstant.UPDATE_BALANCE_REPLY:
//                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
//                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
//                // 持久化消息
//                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
//                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
//                updateBalanceReplyService.create(updateBalanceReply);
//                break;
//            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
//                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
//                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
//                // 持久化消息
//                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
//                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
//                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
//                break;
//            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
//                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
//                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
//                // 持久化消息
//                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
//                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
//                clearOfflineCardReplyService.create(clearOfflineCardReply);
//                break;
//            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
//                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
//                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
//                // 持久化消息
//                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
//                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
//                workingParameterSettingReplyService.create(workingParameterSettingReply);
//                break;
//            case SendTagConstant.TIMING_SETTING:
//                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
//                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
//                // 持久化消息
//                TimingSetting timingSetting = new TimingSetting();
//                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
//                timingSettingService.create(timingSetting);
//                break;
//            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
//                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
//                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
//                // 持久化消息
//                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
//                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
//                setupBillingModelReplyService.create(setupBillingModelReply);
//                break;
//            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
//                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
//                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
//                // 持久化消息
//                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
//                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
//                groundLockRealTimeDataService.create(groundLockRealTimeData);
//                break;
//            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
//                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
//                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
//                // 持久化消息
//                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
//                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
//                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
//                break;
//            case SendTagConstant.PLATFORM_RESTART_REPLY:
//                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
//                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
//                // 持久化消息
//                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
//                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
//                platformRestartReplyService.create(platformRestartReply);
//                break;
//            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
//                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
//                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
//                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
//                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
//                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
//                break;
//            case SendTagConstant.SECURITY_DETECTION:
//                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
//                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
//                SecurityDetection securityDetection = new SecurityDetection();
//                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
//                securityDetectionService.create(securityDetection);
//                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor9.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
//                        BeanUtils.copyProperties(securityDetection, securityDetection1);
//                        chargingOrderClient.securityDetection(securityDetection1);
//                    }
//                });
//                break;
//            default:
//                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
//                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
//                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
//                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
//                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
//                break;
//        }
//    }
//
//    @Override
//    protected void handleMaxRetriesExceeded(ChargingMessage message) {
//        // 当超过指定重试次数消息时此处方法会被调用
//        // 生产中可以进行回退或其他业务操作
//        log.error("消息消费失败,请执行后续处理");
//    }
//
//
//    /**
//     * 是否执行重试机制
//     */
//    @Override
//    protected boolean isRetry() {
//        return true;
//    }
//
//    @Override
//    protected boolean throwException() {
//        // 是否抛出异常,false搭配retry自行处理异常
//        return false;
//    }
//
//    /**
//     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
//     * @param message 待处理消息
//     * @return true: 本次消息被过滤,false:不过滤
//     */
//    @Override
//    protected boolean filter(ChargingMessage message) {
//        // 此处可做消息过滤
//        return false;
//    }
//
//    /**
//     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
//     */
//    @Override
//    public void onMessage(ChargingMessage message) {
//        super.dispatchMessage(message);
//    }
//
//
//    /**
//     * 处理未正常完成费用计算的订单
//     */
//    public void transactionRecord(){
//        Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD);
//        for (String member : members) {
//            TransactionRecord transactionRecord = transactionRecordService.findOne(member);
//            if(null == transactionRecord){
//                redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
//            }else{
//                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
//                BeanUtils.copyProperties(transactionRecord, vo);
//                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
//                if(200 == code){
//                    redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
//                }
//            }
//        }
//    }
//}