Pu Zhibing
2025-06-02 bc1fd81c844d0f7b17dbe612543073ae049b23a0
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
@@ -5,13 +5,13 @@
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
import com.ruoyi.chargingPile.api.model.TChargingGun;
import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.drainage.TCECPushUtil;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.mongodb.service.*;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
@@ -20,12 +20,9 @@
import com.ruoyi.order.api.vo.SecurityDetectionVO;
import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -115,19 +112,19 @@
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private EnhanceProduce enhanceProduce;
    public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){
        log.info("rocket收到的消息内容:{}",message);
        log.info("收到的消息内容:{}",message);
        String serviceId = message.getServiceId();
        if(!StringUtils.hasLength(serviceId)){
            return;
        }
        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
        switch (serviceId){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = message.getOnlineMessage();
@@ -146,24 +143,20 @@
                pingService.save(ping);
                //存储缓存中,5分钟有效
                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
                        vo1.setGun_code(pingMessage.getCharging_gun_code());
                        vo1.setPile_code(pingMessage.getCharging_pile_code());
                        vo1.setStatus(pingMessage.getCharging_gun_status());
                        chargingPileClient.updateChargingPileStatus(vo1);
                        try {
                            tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData());
                        }catch (Exception e){
                            e.printStackTrace();
                            System.out.println("设备状态推送监管平台失败:"+e.getMessage());
                        }
                    }
                });
                UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
                vo1.setGun_code(pingMessage.getCharging_gun_code());
                vo1.setPile_code(pingMessage.getCharging_pile_code());
                vo1.setStatus(pingMessage.getCharging_gun_status());
                chargingPileClient.updateChargingPileStatus(vo1);
                // 监管平台推送充电设备状态
                String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code();
                ChargingMessage chargingMessage = new ChargingMessage();
                chargingMessage.setServiceId(SendTagConstant.GUN_STATUS);
                GunStatusMessage gunStatusMessage = new GunStatusMessage();
                gunStatusMessage.setFullNumber(gunCode);
                chargingMessage.setGunStatusMessage(gunStatusMessage);
                SendResult sendResult = enhanceProduce.gunStatusMessage(chargingMessage);
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
@@ -172,22 +165,22 @@
                EndCharge endCharge = new EndCharge();
                BeanUtils.copyProperties(endChargeMessage,endCharge);
                endChargeService.create(endCharge);
                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor1.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                        try {
                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
                        }catch (Exception e){
                            e.printStackTrace();
                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
                        }
                    }
                });
                // 业务处理
                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                // 订单id
                String transactionSerialNumber = endCharge.getTransaction_serial_number();
                ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
                chargingOrderMessage.setOrderNumber(transactionSerialNumber);
                // 推送充电订单信息
                ChargingMessage chargingMessage1 = new ChargingMessage();
                chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
                chargingMessage1.setOrderMessage(chargingOrderMessage);
                enhanceProduce.orderInfoMessage(chargingMessage1);
                // 推送充电订单状态
                ChargingMessage chargingMessage2 = new ChargingMessage();
                chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
                chargingMessage2.setOrderMessage(chargingOrderMessage);
                enhanceProduce.orderStatusMessage(chargingMessage2);
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
@@ -214,8 +207,8 @@
                acquisitionBillingModeService.create(acquisitionBillingMode);
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                try {
                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
                    // 持久化消息
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
@@ -242,26 +235,23 @@
                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
//                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
//                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                    if(i == 0){
                        log.error("数据存储mongo失败");
                    }
                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                    threadPoolExecutor2.execute(new Runnable() {
                        @Override
                        public void run() {
                            // 业务处理
                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                            chargingOrderClient.chargeMonitoring(query);
                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
                        }
                    });
                    // 业务处理
                    UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                    BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                    chargingOrderClient.chargeMonitoring(query);
                    // 订单id
                    ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
                    chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
                    // 推送充电订单信息
                    ChargingMessage chargingMessage4 = new ChargingMessage();
                    chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
                    chargingMessage4.setOrderMessage(chargingOrderMessage3);
                    enhanceProduce.orderInfoMessage(chargingMessage4);
                } catch (Exception e) {
                    e.printStackTrace();
                }
@@ -276,7 +266,7 @@
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
                log.info("业务消息处理:{}",parameterSettingMessage);
                log.info("参数配置-业务消息处理:{}",parameterSettingMessage);
                // 持久化消息
                ParameterSetting parameterSetting = new ParameterSetting();
                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
@@ -289,15 +279,8 @@
                BmsAbort bmsAbort = new BmsAbort();
                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                bmsAbortService.create(bmsAbort);
                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor3.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                    }
                });
                // 业务处理
                chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
@@ -306,14 +289,8 @@
                MotorAbort motorAbort = new MotorAbort();
                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                motorAbortService.create(motorAbort);
                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor4.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                    }
                });
                // 业务处理
                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
@@ -322,18 +299,13 @@
                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor5.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderBms)){
                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
                        }
                    }
                });
                // 业务处理
                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderBms)){
                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
                }
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
@@ -358,16 +330,11 @@
                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                platformStartChargingReplyService.create(platformStartChargingReply);
                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor6.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                        chargingOrderClient.startChargeSuccessfully(message1);
                    }
                });
                // 业务处理
                PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                chargingOrderClient.startChargeSuccessfully(message1);
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
@@ -376,45 +343,35 @@
                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                platformStopChargingReplyService.create(platformStopChargingReply);
                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor7.execute(new Runnable() {
                    @Override
                    public void run() {
                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                    }
                });
                PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
                BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
                chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
                transactionRecordMessage.setResult(JSONObject.toJSONString(transactionRecordMessage));
                // 持久化消息
                TransactionRecord transactionRecord = new TransactionRecord();
                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                transactionRecord.setResult(transactionRecordMessage.getResult());
                transactionRecordService.create(transactionRecord);
                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor8.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderRecord)){
                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                        }
                        //计算费用
                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                        BeanUtils.copyProperties(transactionRecordMessage,vo);
                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                        if(200 != code){
                            //失败后添加到队列中继续处理数据
                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
                        }
                    }
                });
    
                // 业务处理
                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderRecord)){
                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                }
                //计算费用
                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                BeanUtils.copyProperties(transactionRecordMessage,vo);
                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                if(200 != code){
                    //失败后添加到队列中继续处理数据
                    redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
                }
    
                // 添加实时上传记录结束记录
                // 查询mogondb上一条数据
@@ -511,19 +468,15 @@
                SecurityDetection securityDetection = new SecurityDetection();
                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                securityDetectionService.create(securityDetection);
                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor9.execute(new Runnable() {
                    @Override
                    public void run() {
                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
                        BeanUtils.copyProperties(securityDetection, securityDetection1);
                        chargingOrderClient.securityDetection(securityDetection1);
                    }
                });
                SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
                BeanUtils.copyProperties(securityDetection, securityDetection1);
                chargingOrderClient.securityDetection(securityDetection1);
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
                log.info("业务消息处理:{}",platformRemoteUpdateReplyMessage);
                // 持久化消息
                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);