Pu Zhibing
2025-04-30 1adec9fead03f0f788a73f9349ccba86569e31f3
修改rocketmq连接方式和修改发起充电异常情况下将订单挂起的功能
39个文件已修改
2个文件已添加
4918 ■■■■■ 已修改文件
ruoyi-service/ruoyi-integration/pom.xml 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java 59 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java 70 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java 91 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java 64 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java 98 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java 1251 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java 1039 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java 392 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/pom.xml
@@ -116,30 +116,36 @@
            <scope>test</scope>
        </dependency>
        <!--rocketmq-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.2.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
<!--            <version>2.2.2.RELEASE</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>org.apache.rocketmq</groupId>-->
<!--                    <artifactId>rocketmq-client</artifactId>-->
<!--                </exclusion>-->
<!--                <exclusion>-->
<!--                    <groupId>org.apache.rocketmq</groupId>-->
<!--                    <artifactId>rocketmq-acl</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.rocketmq</groupId>-->
<!--            <artifactId>rocketmq-client</artifactId>-->
<!--            <version>4.7.1</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.rocketmq</groupId>-->
<!--            <artifactId>rocketmq-acl</artifactId>-->
<!--            <version>4.7.1</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!--mongodb-->
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java
@@ -6,10 +6,6 @@
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@@ -24,7 +20,6 @@
@SpringBootApplication
@EnableScheduling//开启定时任务
@EnableTransactionManagement//开启事务
@EnableBinding({ Source.class, Sink.class })
public class RuoYiIntegrationApplication {
    public static void main(String[] args) {
        SpringApplication.run(RuoYiIntegrationApplication.class, args);
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java
@@ -300,7 +300,7 @@
    public BaseResult queryStationsInfo(@RequestBody BaseRequest baseRequest, HttpServletRequest request){
        log.info("三方平台查询充电站信息请求参数:" + JacksonUtils.toJson(baseRequest));
        //校验token和签名
        BaseResult baseResult = requestCheckJianGuan(true, baseRequest, request);
        BaseResult baseResult = requestCheck(true, baseRequest, request);
        if(0 != baseResult.getRet()){
            log.info("三方平台查询充电站信息响应Data:");
            baseResult.setData("");
@@ -399,8 +399,8 @@
            stationInfo.setCountryCode(StringUtils.isNotEmpty(datum.getCountryCode()) ? datum.getCountryCode() : "CN");
            stationInfo.setAreaCode(datum.getDistrictsCode());
            stationInfo.setAddress(datum.getAddress());
            stationInfo.setStationTel(datum.getPhone());
            stationInfo.setServiceTel(serviceTel);
            stationInfo.setStationTel(StringUtils.isNotEmpty(datum.getPhone()) ? datum.getPhone().split(",")[0] : "");
            stationInfo.setServiceTel(StringUtils.isNotEmpty(serviceTel) ? serviceTel.split(",")[0] : "");
            switch (datum.getSiteType()){
                case 0:
                    stationInfo.setStationType(StationTypeEnum.OTHER.getType());
@@ -588,7 +588,7 @@
     * @return
     */
    @PostMapping("/pushChargingGunStatus")
    public R pushChargingGunStatus(@RequestParam("fullNumber") String fullNumber, @RequestParam("status") Integer status){
    public R pushChargingGunStatus(@RequestParam(value = "fullNumber") String fullNumber, @RequestParam(value = "status") Integer status){
        ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo();
        connectorStatusInfo.setConnectorID(fullNumber);
        switch (status){
@@ -1282,7 +1282,7 @@
            }
            chargeDetail.setElecPrice(datum.getElectrovalence());
            chargeDetail.setSevicePrice(datum.getServiceCharge());
            chargeDetail.setDetailPower(datum.getChargingCapacity());
            chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP));
            chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice());
            chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice());
            chargeDetails.add(chargeDetail);
@@ -1489,6 +1489,7 @@
        notificationChargeOrderInfo.setStartChargeSeq(startChargeSeq);
        TChargingOrder chargingOrder = chargingOrderClient.getChargingOrderByStartChargeSeq(startChargeSeq).getData();
        if(null == chargingOrder){
            log.info("三方平台流水号获取订单失败");
            return null;
        }
        TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData();
@@ -1556,7 +1557,7 @@
            }
            chargeDetail.setElecPrice(datum.getElectrovalence());
            chargeDetail.setSevicePrice(datum.getServiceCharge());
            chargeDetail.setDetailPower(datum.getChargingCapacity());
            chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP));
            chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice());
            chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice());
            chargeDetails.add(chargeDetail);
@@ -1751,7 +1752,7 @@
                }
                chargeDetail.setElecPrice(datum.getElectrovalence());
                chargeDetail.setSevicePrice(datum.getServiceCharge());
                chargeDetail.setDetailPower(datum.getChargingCapacity());
                chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_EVEN));
                chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice());
                chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice());
                chargeDetails.add(chargeDetail);
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -15,7 +15,6 @@
import com.ruoyi.integration.iotda.utils.tools.MessageUtil;
import com.ruoyi.integration.iotda.utils.tools.StrategyUtil;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.produce.ChargingMessageUtil;
import com.ruoyi.integration.rocket.produce.EnhanceProduce;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
@@ -49,9 +48,6 @@
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    
    @Resource
    private ChargingMessageUtil chargingMessageUtil;
    
    
    
@@ -84,8 +80,9 @@
        switch (service_id){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class);
                chargingMessage.setOnlineMessage(onlineMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.onlineMessage(onlineMessage);
//                chargingMessage.setOnlineMessage(onlineMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件
                // 业务处理 登录认证应答
                OnlineReply onlineReply = new OnlineReply();
@@ -102,8 +99,9 @@
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class);
                chargingMessage.setPingMessage(pingMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.pingMessage(pingMessage);
//                chargingMessage.setPingMessage(pingMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件
                Pong pong = new Pong();
                pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
@@ -114,22 +112,25 @@
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class);
                chargingMessage.setEndChargeMessage(endChargeMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.endChargeMessage(endChargeMessage);
//                chargingMessage.setEndChargeMessage(endChargeMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class);
                chargingMessage.setErrorMessageMessage(errorMessageMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage);
//                chargingMessage.setErrorMessageMessage(errorMessageMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class);
                chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage);
//                chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件
                BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply();
                if(billingModeVerifyMessage.getBilling_model_code().equals("0")){
@@ -157,8 +158,9 @@
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class);
                chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage);
//                chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件   计费模型请求应答  1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段
                List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData();
                Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails);
@@ -177,55 +179,63 @@
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                log.info("充电实时数据上传");
                UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class);
                chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
//                chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class);
                chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage);
//                chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class);
                chargingMessage.setParameterSettingMessage(parameterSettingMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage);
//                chargingMessage.setParameterSettingMessage(parameterSettingMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class);
                chargingMessage.setBmsAbortMessage(bmsAbortMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage);
//                chargingMessage.setBmsAbortMessage(bmsAbortMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class);
                chargingMessage.setMotorAbortMessage(motorAbortMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage);
//                chargingMessage.setMotorAbortMessage(motorAbortMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class);
                chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
//                chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class);
                chargingMessage.setBmsInformationMessage(bmsInformationMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage);
//                chargingMessage.setBmsInformationMessage(bmsInformationMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class);
                chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
//                chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件
                PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging();
                platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code());
@@ -239,23 +249,26 @@
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class);
                chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage);
//                chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class);
                chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage);
//                chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class);
                transactionRecordMessage.setResult(content.toJSONString());
                chargingMessage.setTransactionRecordMessage(transactionRecordMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage);
//                chargingMessage.setTransactionRecordMessage(transactionRecordMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件
                ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord();
                confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number());
@@ -265,36 +278,41 @@
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class);
                chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage);
//                chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class);
                chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
//                chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class);
                chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
//                chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class);
                chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
//                chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class);
                chargingMessage.setTimingSettingMessage(timingSettingMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage);
//                chargingMessage.setTimingSettingMessage(timingSettingMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
                // 响应硬件 对时设置应答
                TimingSettingReply timingSettingReply = new TimingSettingReply();
                timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code());
@@ -304,55 +322,62 @@
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class);
                chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage);
//                chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class);
                chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
//                chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class);
                chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
//                chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class);
                chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage);
//                chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class);
                chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
//                chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class);
                chargingMessage.setSecurityDetectionMessage(securityDetectionMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage);
//                chargingMessage.setSecurityDetectionMessage(securityDetectionMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class);
                chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
                chargingMessageUtil.handleMessage(chargingMessage);
                sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
//                chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
//                chargingMessageUtil.handleMessage(chargingMessage);
//                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
        }
//        log.info("rocketmq消息下发结果:{}",sendResult);
        log.info("rocketmq消息下发结果:{}",sendResult);
        return AjaxResult.success();
    }
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java
@@ -30,76 +30,22 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_acquisition_billing_mode",
        topic = "charge_acquisition_billing_mode",
        selectorExpression = "acquisition_billing_mode", // 明确指定标签
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "acquisition_billing_mode"
)
public class AcquisitionBillingModeMessageListener extends EnhanceMessageHandler<AcquisitionBillingModeMessage> implements RocketMQListener<AcquisitionBillingModeMessage> {
public class AcquisitionBillingModeMessageListener implements RocketMQListener<AcquisitionBillingModeMessage> {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Autowired
    private IotMessageProduce iotMessageProduce;
    @Autowired
    private MessageUtil messageUtil;
    @Override
    protected void handleMessage(AcquisitionBillingModeMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩计费模型请求-业务消息处理:{}",message);
        // 持久化消息
        AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
        BeanUtils.copyProperties(message,acquisitionBillingMode);
        acquisitionBillingModeService.create(acquisitionBillingMode);
        // 业务处理  计费模型请求应答  1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段
//        List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(message.getCharging_pile_code()).getData();
//        Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails);
//        // 价格设置
//        AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply();
//        StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply);
//        // 时段设置
//        StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply);
//        iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply));
    }
    @Override
    protected void handleMaxRetriesExceeded(AcquisitionBillingModeMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(AcquisitionBillingModeMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(AcquisitionBillingModeMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩计费模型请求-业务消息处理:{}",message);
        // 持久化消息
        AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
        BeanUtils.copyProperties(message,acquisitionBillingMode);
        acquisitionBillingModeService.create(acquisitionBillingMode);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java
@@ -27,63 +27,22 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_billing_mode_verify",
        topic = "charge_billing_mode_verify",
        selectorExpression = "billing_mode_verify",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "billing_mode_verify"
)
public class BillingModeVerifyMessageListener extends EnhanceMessageHandler<BillingModeVerifyMessage> implements RocketMQListener<BillingModeVerifyMessage> {
public class BillingModeVerifyMessageListener implements RocketMQListener<BillingModeVerifyMessage> {
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Override
    protected void handleMessage(BillingModeVerifyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("计费模型验证请求-业务消息处理:{}",message);
        // 持久化消息
        BillingModeVerify billingModeVerify = new BillingModeVerify();
        BeanUtils.copyProperties(message,billingModeVerify);
        billingModeVerifyService.create(billingModeVerify);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(BillingModeVerifyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(BillingModeVerifyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(BillingModeVerifyMessage message) {
        super.dispatchMessage(message);
        log.info("计费模型验证请求-业务消息处理:{}",message);
        // 持久化消息
        BillingModeVerify billingModeVerify = new BillingModeVerify();
        BeanUtils.copyProperties(message,billingModeVerify);
        billingModeVerifyService.create(billingModeVerify);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java
@@ -15,6 +15,9 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@@ -22,22 +25,21 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_bms_abort",
        topic = "charge_bms_abort",
        selectorExpression = "bms_abort",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "bms_abort"
)
public class BmsAbortMessageListener extends EnhanceMessageHandler<BmsAbortMessage> implements RocketMQListener<BmsAbortMessage> {
public class BmsAbortMessageListener implements RocketMQListener<BmsAbortMessage> {
    @Autowired
    private BmsAbortService bmsAbortService;
    
    @Resource
    private ChargingOrderClient chargingOrderClient;
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    protected void handleMessage(BmsAbortMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
    public void onMessage(BmsAbortMessage message) {
        log.info("充电阶段BMS中止-业务消息处理:{}",message);
        // 持久化消息
        BmsAbort bmsAbort = new BmsAbort();
@@ -45,46 +47,5 @@
        bmsAbortService.create(bmsAbort);
        // 业务处理
        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
    }
    @Override
    protected void handleMaxRetriesExceeded(BmsAbortMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(BmsAbortMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(BmsAbortMessage message) {
        super.dispatchMessage(message);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java
@@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
@@ -23,69 +24,32 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_bms_demand_and_charger_exportation",
        topic = "charge_bms_demand_and_charger_exportation",
        selectorExpression = "bms_demand_and_charger_exportation",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "bms_demand_and_charger_exportation"
)
public class BmsDemandAndChargerExportationMessageListener extends EnhanceMessageHandler<BmsDemandAndChargerExportationMessage> implements RocketMQListener<BmsDemandAndChargerExportationMessage> {
public class BmsDemandAndChargerExportationMessageListener implements RocketMQListener<BmsDemandAndChargerExportationMessage> {
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Override
    protected void handleMessage(BmsDemandAndChargerExportationMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message);
        // 持久化消息
        BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
        BeanUtils.copyProperties(message,bmsDemandAndChargerExportation);
        bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
        // 业务处理
        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        if(Objects.nonNull(chargingOrder)){
            chargingOrder.setNeedElec(message.getBms_current_requirements());
            chargingOrderClient.updateChargingOrder(chargingOrder);
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(BmsDemandAndChargerExportationMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(BmsDemandAndChargerExportationMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(BmsDemandAndChargerExportationMessage message) {
        super.dispatchMessage(message);
        log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message);
        // 持久化消息
        BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
        BeanUtils.copyProperties(message,bmsDemandAndChargerExportation);
        bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
        // 业务处理
        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        if(Objects.nonNull(chargingOrderBms)){
            chargingOrderBms.setNeedElec(message.getBms_current_requirements());
            chargingOrderClient.updateChargingOrder(chargingOrderBms);
        }
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java
@@ -1,10 +1,8 @@
package com.ruoyi.integration.rocket.listener;
import com.ruoyi.integration.api.model.BmsDemandAndChargerExportation;
import com.ruoyi.integration.api.model.BmsInformation;
import com.ruoyi.integration.mongodb.service.BmsInformationService;
import com.ruoyi.integration.rocket.model.BmsInformationMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -19,62 +17,22 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_bms_information",
        topic = "charge_bms_information",
        selectorExpression = "bms_information",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "bms_information"
)
public class BmsInformationMessageListener extends EnhanceMessageHandler<BmsInformationMessage> implements RocketMQListener<BmsInformationMessage> {
public class BmsInformationMessageListener implements RocketMQListener<BmsInformationMessage> {
    @Autowired
    private BmsInformationService bmsInformationService;
    @Override
    protected void handleMessage(BmsInformationMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电过程BMS信息-业务消息处理:{}",message);
        // 持久化消息
        BmsInformation bmsInformation = new BmsInformation();
        BeanUtils.copyProperties(message,bmsInformation);
        bmsInformationService.create(bmsInformation);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(BmsInformationMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(BmsInformationMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(BmsInformationMessage message) {
        super.dispatchMessage(message);
        log.info("充电过程BMS信息-业务消息处理:{}",message);
        // 持久化消息
        BmsInformation bmsInformation = new BmsInformation();
        BeanUtils.copyProperties(message,bmsInformation);
        bmsInformationService.create(bmsInformation);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java
@@ -19,62 +19,23 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_handshake",
        topic = "charge_charging_handshake",
        selectorExpression = "charging_handshake",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "charging_handshake"
)
public class ChargingHandshakeMessageListener extends EnhanceMessageHandler<ChargingHandshakeMessage> implements RocketMQListener<ChargingHandshakeMessage> {
public class ChargingHandshakeMessageListener implements RocketMQListener<ChargingHandshakeMessage> {
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Override
    protected void handleMessage(ChargingHandshakeMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电握手-业务消息处理:{}",message);
        // 持久化消息
        ChargingHandshake chargingHandshake = new ChargingHandshake();
        BeanUtils.copyProperties(message,chargingHandshake);
        chargingHandshakeService.create(chargingHandshake);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingHandshakeMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingHandshakeMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingHandshakeMessage message) {
        super.dispatchMessage(message);
        log.info("充电握手-业务消息处理:{}",message);
        // 持久化消息
        ChargingHandshake chargingHandshake = new ChargingHandshake();
        BeanUtils.copyProperties(message,chargingHandshake);
        chargingHandshakeService.create(chargingHandshake);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_pile_returns_ground_lock_data",
        topic = "charge_charging_pile_returns_ground_lock_data",
        selectorExpression = "charging_pile_returns_ground_lock_data",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "charging_pile_returns_ground_lock_data"
)
public class ChargingPileReturnsGroundLockDataMessageListener extends EnhanceMessageHandler<ChargingPileReturnsGroundLockDataMessage> implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> {
public class ChargingPileReturnsGroundLockDataMessageListener implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> {
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Override
    protected void handleMessage(ChargingPileReturnsGroundLockDataMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩返回数据(上行)-业务消息处理:{}",message);
        // 持久化消息
        ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
        BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData);
        chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingPileReturnsGroundLockDataMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingPileReturnsGroundLockDataMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingPileReturnsGroundLockDataMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩返回数据(上行)-业务消息处理:{}",message);
        // 持久化消息
        ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
        BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData);
        chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_pile_starts_charging",
        topic = "charge_charging_pile_starts_charging",
        selectorExpression = "charging_pile_starts_charging",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "charging_pile_starts_charging"
)
public class ChargingPileStartsChargingMessageListener extends EnhanceMessageHandler<ChargingPileStartsChargingMessage> implements RocketMQListener<ChargingPileStartsChargingMessage> {
public class ChargingPileStartsChargingMessageListener implements RocketMQListener<ChargingPileStartsChargingMessage> {
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Override
    protected void handleMessage(ChargingPileStartsChargingMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩主动申请启动充电-业务消息处理:{}",message);
        // 持久化消息
        ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
        BeanUtils.copyProperties(message,chargingPileStartsCharging);
        chargingPileStartsChargingService.create(chargingPileStartsCharging);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingPileStartsChargingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingPileStartsChargingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingPileStartsChargingMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩主动申请启动充电-业务消息处理:{}",message);
        // 持久化消息
        ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
        BeanUtils.copyProperties(message,chargingPileStartsCharging);
        chargingPileStartsChargingService.create(chargingPileStartsCharging);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_clear_offline_card_reply",
        topic = "charge_clear_offline_card_reply",
        selectorExpression = "clear_offline_card_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "clear_offline_card_reply"
)
public class ClearOfflineCardReplyMessageListener extends EnhanceMessageHandler<ClearOfflineCardReplyMessage> implements RocketMQListener<ClearOfflineCardReplyMessage> {
public class ClearOfflineCardReplyMessageListener implements RocketMQListener<ClearOfflineCardReplyMessage> {
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Override
    protected void handleMessage(ClearOfflineCardReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("离线卡数据清除应答-业务消息处理:{}",message);
        // 持久化消息
        ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
        BeanUtils.copyProperties(message,clearOfflineCardReply);
        clearOfflineCardReplyService.create(clearOfflineCardReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ClearOfflineCardReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ClearOfflineCardReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ClearOfflineCardReplyMessage message) {
        super.dispatchMessage(message);
        log.info("离线卡数据清除应答-业务消息处理:{}",message);
        // 持久化消息
        ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
        BeanUtils.copyProperties(message,clearOfflineCardReply);
        clearOfflineCardReplyService.create(clearOfflineCardReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java
@@ -3,13 +3,17 @@
import com.ruoyi.integration.api.model.EndCharge;
import com.ruoyi.integration.api.model.Ping;
import com.ruoyi.integration.api.model.Pong;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce;
import com.ruoyi.integration.iotda.utils.tools.MessageUtil;
import com.ruoyi.integration.mongodb.service.EndChargeService;
import com.ruoyi.integration.mongodb.service.PingService;
import com.ruoyi.integration.rocket.model.ChargingMessage;
import com.ruoyi.integration.rocket.model.ChargingOrderMessage;
import com.ruoyi.integration.rocket.model.EndChargeMessage;
import com.ruoyi.integration.rocket.model.PingMessage;
import com.ruoyi.integration.rocket.produce.EnhanceProduce;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import lombok.extern.slf4j.Slf4j;
@@ -28,21 +32,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_end_charge",
        topic = "charge_end_charge",
        selectorExpression = "end_charge",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "end_charge"
)
public class EndChargeMessageListener extends EnhanceMessageHandler<EndChargeMessage> implements RocketMQListener<EndChargeMessage> {
public class EndChargeMessageListener implements RocketMQListener<EndChargeMessage> {
    @Autowired
    private EndChargeService endChargeService;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private EnhanceProduce enhanceProduce;
    
    
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    protected void handleMessage(EndChargeMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
    public void onMessage(EndChargeMessage message) {
        log.info("充电结束-业务消息处理:{}",message);
        // 持久化消息
        EndCharge endCharge = new EndCharge();
@@ -50,46 +57,19 @@
        endChargeService.create(endCharge);
        // 业务处理
        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
    }
    @Override
    protected void handleMaxRetriesExceeded(EndChargeMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(EndChargeMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(EndChargeMessage message) {
        super.dispatchMessage(message);
        // 订单id
        String transactionSerialNumber = endCharge.getTransaction_serial_number();
        ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
        chargingOrderMessage.setOrderNumber(transactionSerialNumber);
        // 推送充电订单信息
        ChargingMessage chargingMessage1 = new ChargingMessage();
        chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
        chargingMessage1.setOrderMessage(chargingOrderMessage);
        enhanceProduce.orderInfoMessage(chargingMessage1);
        // 推送充电订单状态
        ChargingMessage chargingMessage2 = new ChargingMessage();
        chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
        chargingMessage2.setOrderMessage(chargingOrderMessage);
        enhanceProduce.orderStatusMessage(chargingMessage2);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java
@@ -17,64 +17,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_error_message",
        topic = "charge_error_message",
        selectorExpression = "error_message",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "error_message"
)
public class ErrorMessageMessageListener extends EnhanceMessageHandler<ErrorMessageMessage> implements RocketMQListener<ErrorMessageMessage> {
public class ErrorMessageMessageListener implements RocketMQListener<ErrorMessageMessage> {
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Override
    protected void handleMessage(ErrorMessageMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("错误报文-业务消息处理:{}",message);
        // 持久化消息
        ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
        BeanUtils.copyProperties(message,errorMessageMessage);
        errorMessageMessageService.create(errorMessageMessage);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ErrorMessageMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ErrorMessageMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ErrorMessageMessage message) {
        super.dispatchMessage(message);
        log.info("错误报文-业务消息处理:{}",message);
        // 持久化消息
        ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
        BeanUtils.copyProperties(message,errorMessageMessage);
        errorMessageMessageService.create(errorMessageMessage);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_ground_lock_real_time_data",
        topic = "charge_ground_lock_real_time_data",
        selectorExpression = "ground_lock_real_time_data",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "ground_lock_real_time_data"
)
public class GroundLockRealTimeDataMessageListener extends EnhanceMessageHandler<GroundLockRealTimeDataMessage> implements RocketMQListener<GroundLockRealTimeDataMessage> {
public class GroundLockRealTimeDataMessageListener implements RocketMQListener<GroundLockRealTimeDataMessage> {
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Override
    protected void handleMessage(GroundLockRealTimeDataMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message);
        // 持久化消息
        GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
        BeanUtils.copyProperties(message,groundLockRealTimeData);
        groundLockRealTimeDataService.create(groundLockRealTimeData);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(GroundLockRealTimeDataMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(GroundLockRealTimeDataMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(GroundLockRealTimeDataMessage message) {
        super.dispatchMessage(message);
        log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message);
        // 持久化消息
        GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
        BeanUtils.copyProperties(message,groundLockRealTimeData);
        groundLockRealTimeDataService.create(groundLockRealTimeData);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java
@@ -22,10 +22,9 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_motor_abort",
        topic = "charge_motor_abort",
        selectorExpression = "motor_abort",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "motor_abort"
)
public class MotorAbortMessageListener extends EnhanceMessageHandler<MotorAbortMessage> implements RocketMQListener<MotorAbortMessage> {
public class MotorAbortMessageListener implements RocketMQListener<MotorAbortMessage> {
    @Autowired
    private MotorAbortService motorAbortService;
@@ -34,11 +33,11 @@
    private ChargingOrderClient chargingOrderClient;
    
    
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    protected void handleMessage(MotorAbortMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
    public void onMessage(MotorAbortMessage message) {
        log.info("充电阶段充电机中止-业务消息处理:{}",message);
        // 持久化消息
        MotorAbort motorAbort = new MotorAbort();
@@ -46,46 +45,5 @@
        motorAbortService.create(motorAbort);
        // 业务处理
        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
    }
    @Override
    protected void handleMaxRetriesExceeded(MotorAbortMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(MotorAbortMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(MotorAbortMessage message) {
        super.dispatchMessage(message);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java
@@ -1,5 +1,6 @@
package com.ruoyi.integration.rocket.listener;
import com.alibaba.fastjson.JSON;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.mongodb.service.OnlineService;
import com.ruoyi.integration.rocket.model.OnlineMessage;
@@ -16,63 +17,22 @@
@RocketMQMessageListener(
        consumerGroup = "charge_online",
        topic = "charge_online",
        selectorExpression = "online", // 明确指定标签
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "online"
)
public class OnlineMessageListener extends EnhanceMessageHandler<OnlineMessage> implements RocketMQListener<OnlineMessage> {
public class OnlineMessageListener implements RocketMQListener<OnlineMessage> {
    @Autowired
    private OnlineService onlineService;
    @Override
    protected void handleMessage(OnlineMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩登录认证业务消息处理:{}",message);
        // 持久化消息
        Online online = new Online();
        BeanUtils.copyProperties(message,online);
        onlineService.create(online);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(OnlineMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(OnlineMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(OnlineMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩登录认证业务消息处理:{}", JSON.toJSONString(message));
        // 持久化消息
        Online online = new Online();
        BeanUtils.copyProperties(message,online);
        onlineService.create(online);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java
@@ -17,65 +17,26 @@
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        consumerGroup = "charge_parameter_setting",
        topic = "charge_parameter_setting",
        selectorExpression = "parameter_setting"
)
public class ParameterSettingMessageListener extends EnhanceMessageHandler<ParameterSettingMessage> implements RocketMQListener<ParameterSettingMessage> {
public class ParameterSettingMessageListener implements RocketMQListener<ParameterSettingMessage> {
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Override
    protected void handleMessage(ParameterSettingMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("业务消息处理:{}",message);
        // 持久化消息
        ParameterSetting parameterSetting = new ParameterSetting();
        BeanUtils.copyProperties(message,parameterSetting);
        parameterSettingService.create(parameterSetting);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(ParameterSettingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ParameterSettingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ParameterSettingMessage message) {
        super.dispatchMessage(message);
        log.info("参数配置-业务消息处理:{}",message);
        // 持久化消息
        ParameterSetting parameterSetting = new ParameterSetting();
        BeanUtils.copyProperties(message,parameterSetting);
        parameterSettingService.create(parameterSetting);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java
@@ -1,18 +1,30 @@
package com.ruoyi.integration.rocket.listener;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
import com.ruoyi.integration.api.model.Ping;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.mongodb.service.PingService;
import com.ruoyi.integration.rocket.model.ChargingMessage;
import com.ruoyi.integration.rocket.model.GunStatusMessage;
import com.ruoyi.integration.rocket.model.OnlineMessage;
import com.ruoyi.integration.rocket.model.PingMessage;
import com.ruoyi.integration.rocket.produce.EnhanceProduce;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@@ -20,62 +32,47 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_ping",
        topic = "charge_ping",
        selectorExpression = "ping",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "ping"
)
public class PingMessageListener extends EnhanceMessageHandler<PingMessage> implements RocketMQListener<PingMessage> {
public class PingMessageListener implements RocketMQListener<PingMessage> {
    @Autowired
    private PingService pingService;
    @Override
    protected void handleMessage(PingMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩心跳包-业务消息处理:{}",message);
        // 持久化消息
        Ping ping = new Ping();
        BeanUtils.copyProperties(message,ping);
        pingService.create(ping);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(PingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(PingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private EnhanceProduce enhanceProduce;
    @Resource
    private ChargingPileClient chargingPileClient;
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(PingMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩心跳包-业务消息处理:{}", JSON.toJSONString(message));
        // 持久化消息
        Ping ping = new Ping();
        BeanUtils.copyProperties(message,ping);
        pingService.save(ping);
        //存储缓存中,5分钟有效
        redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
        vo1.setGun_code(message.getCharging_gun_code());
        vo1.setPile_code(message.getCharging_pile_code());
        vo1.setStatus(message.getCharging_gun_status());
        chargingPileClient.updateChargingPileStatus(vo1);
        // 监管平台推送充电设备状态
        SendResult sendResult;
        String gunCode = message.getCharging_pile_code() + message.getCharging_gun_code();
        ChargingMessage chargingMessage = new ChargingMessage();
        chargingMessage.setServiceId(SendTagConstant.GUN_STATUS);
        GunStatusMessage gunStatusMessage = new GunStatusMessage();
        gunStatusMessage.setFullNumber(gunCode);
        chargingMessage.setGunStatusMessage(gunStatusMessage);
        sendResult = enhanceProduce.gunStatusMessage(chargingMessage);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java
@@ -1,6 +1,7 @@
package com.ruoyi.integration.rocket.listener;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.api.model.ParameterSetting;
import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply;
import com.ruoyi.integration.mongodb.service.PlatformRemoteUpdateReplyService;
import com.ruoyi.integration.rocket.model.PlatformRemoteUpdateReplyMessage;
@@ -19,63 +20,23 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_platform_remote_update_reply",
        topic = "charge_platform_remote_update_reply",
        selectorExpression = "platform_remote_update_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "platform_remote_update_reply"
)
public class PlatformRemoteUpdateReplyMessageListener extends EnhanceMessageHandler<PlatformRemoteUpdateReplyMessage> implements RocketMQListener<PlatformRemoteUpdateReplyMessage> {
public class PlatformRemoteUpdateReplyMessageListener implements RocketMQListener<PlatformRemoteUpdateReplyMessage> {
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Override
    protected void handleMessage(PlatformRemoteUpdateReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("远程更新应答-业务消息处理:{}",message);
        // 持久化消息
        PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
        BeanUtils.copyProperties(message,platformRemoteUpdateReply);
        platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(PlatformRemoteUpdateReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(PlatformRemoteUpdateReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(PlatformRemoteUpdateReplyMessage message) {
        super.dispatchMessage(message);
        log.info("业务消息处理:{}",message);
        // 持久化消息
        PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
        BeanUtils.copyProperties(message,platformRemoteUpdateReply);
        platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java
@@ -19,63 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_platform_restart_reply",
        topic = "charge_platform_restart_reply",
        selectorExpression = "platform_restart_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "platform_restart_reply"
)
public class PlatformRestartReplyMessageListener extends EnhanceMessageHandler<PlatformRestartReplyMessage> implements RocketMQListener<PlatformRestartReplyMessage> {
public class PlatformRestartReplyMessageListener implements RocketMQListener<PlatformRestartReplyMessage> {
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Override
    protected void handleMessage(PlatformRestartReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("远程重启应答-业务消息处理:{}",message);
        // 持久化消息
        PlatformRestartReply platformRestartReply = new PlatformRestartReply();
        BeanUtils.copyProperties(message,platformRestartReply);
        platformRestartReplyService.create(platformRestartReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(PlatformRestartReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(PlatformRestartReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(PlatformRestartReplyMessage message) {
        super.dispatchMessage(message);
        log.info("远程重启应答-业务消息处理:{}",message);
        // 持久化消息
        PlatformRestartReply platformRestartReply = new PlatformRestartReply();
        BeanUtils.copyProperties(message,platformRestartReply);
        platformRestartReplyService.create(platformRestartReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java
@@ -23,10 +23,9 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_platform_start_charging_reply",
        topic = "charge_platform_start_charging_reply",
        selectorExpression = "platform_start_charging_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "platform_start_charging_reply"
)
public class PlatformStartChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStartChargingReplyMessage> implements RocketMQListener<PlatformStartChargingReplyMessage> {
public class PlatformStartChargingReplyMessageListener implements RocketMQListener<PlatformStartChargingReplyMessage> {
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
@@ -36,58 +35,20 @@
    
    
    @Override
    protected void handleMessage(PlatformStartChargingReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("远程启机命令回复-业务消息处理:{}",message);
        // 持久化消息
        PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
        BeanUtils.copyProperties(message,platformStartChargingReply);
        platformStartChargingReplyService.create(platformStartChargingReply);
        // 业务处理
        PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
        BeanUtils.copyProperties(message, message1);
        chargingOrderClient.startChargeSuccessfully(message1);
    }
    @Override
    protected void handleMaxRetriesExceeded(PlatformStartChargingReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(PlatformStartChargingReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(PlatformStartChargingReplyMessage message) {
        super.dispatchMessage(message);
        log.info("远程启机命令回复-业务消息处理:{}",message);
        // 持久化消息
        PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
        BeanUtils.copyProperties(message,platformStartChargingReply);
        platformStartChargingReplyService.create(platformStartChargingReply);
        // 业务处理
        PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
        BeanUtils.copyProperties(message, message1);
        chargingOrderClient.startChargeSuccessfully(message1);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java
@@ -5,6 +5,8 @@
import com.ruoyi.integration.mongodb.service.PlatformStopChargingReplyService;
import com.ruoyi.integration.rocket.model.PlatformStopChargingReplyMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -13,68 +15,37 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_platform_stop_charging_reply",
        topic = "charge_platform_stop_charging_reply",
        selectorExpression = "platform_stop_charging_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "platform_stop_charging_reply"
)
public class PlatformStopChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStopChargingReplyMessage> implements RocketMQListener<PlatformStopChargingReplyMessage> {
public class PlatformStopChargingReplyMessageListener implements RocketMQListener<PlatformStopChargingReplyMessage> {
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Override
    protected void handleMessage(PlatformStopChargingReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("远程停机命令回复-业务消息处理:{}",message);
        // 持久化消息
        PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
        BeanUtils.copyProperties(message,platformStopChargingReply);
        platformStopChargingReplyService.create(platformStopChargingReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(PlatformStopChargingReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(PlatformStopChargingReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    @Resource
    private ChargingOrderClient chargingOrderClient;
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(PlatformStopChargingReplyMessage message) {
        super.dispatchMessage(message);
        log.info("远程停机命令回复-业务消息处理:{}",message);
        // 持久化消息
        PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
        BeanUtils.copyProperties(message,platformStopChargingReply);
        platformStopChargingReplyService.create(platformStopChargingReply);
        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java
New file
@@ -0,0 +1,40 @@
package com.ruoyi.integration.rocket.listener;
import com.ruoyi.integration.api.model.QrCodeDeliveryReply;
import com.ruoyi.integration.mongodb.service.QrCodeDeliveryReplyService;
import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage;
import com.ruoyi.integration.rocket.model.QrCodeDeliveryReplyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author zhibing.pu
 * @Date 2025/4/28 14:57
 */
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_qr_code_delivery_reply",
        topic = "charge_qr_code_delivery_reply",
        selectorExpression = "qr_code_delivery_reply"
)
public class QrCodeDeliveryReplyMessageListener implements RocketMQListener<QrCodeDeliveryReplyMessage> {
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Override
    public void onMessage(QrCodeDeliveryReplyMessage message) {
        log.info("二维码下发应答-业务消息处理:{}",message);
        QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
        BeanUtils.copyProperties(message,qrCodeDeliveryReply);
        qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java
@@ -1,10 +1,13 @@
package com.ruoyi.integration.rocket.listener;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply;
import com.ruoyi.integration.api.model.PlatformStopChargingReply;
import com.ruoyi.integration.api.model.QueryOfflineCardReply;
import com.ruoyi.integration.mongodb.service.QueryOfflineCardReplyService;
import com.ruoyi.integration.rocket.model.QueryOfflineCardReplyMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -17,64 +20,27 @@
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        consumerGroup = "charge_query_offline_card_reply",
        topic = "charge_query_offline_card_reply",
        selectorExpression = "query_offline_card_reply"
)
public class QueryOfflineCardReplyMessageListener extends EnhanceMessageHandler<QueryOfflineCardReplyMessage> implements RocketMQListener<QueryOfflineCardReplyMessage> {
public class QueryOfflineCardReplyMessageListener implements RocketMQListener<QueryOfflineCardReplyMessage> {
    @Autowired
    private QueryOfflineCardReplyService queryOfflineCardReplyService;
    @Override
    protected void handleMessage(QueryOfflineCardReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("离线卡数据查询应答-业务消息处理:{}",message);
        // 持久化消息
        QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply();
        BeanUtils.copyProperties(message,queryOfflineCardReply);
        queryOfflineCardReplyService.create(queryOfflineCardReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(QueryOfflineCardReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(QueryOfflineCardReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(QueryOfflineCardReplyMessage message) {
        super.dispatchMessage(message);
        log.info("业务消息处理:{}",message);
        // 持久化消息
        QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply();
        BeanUtils.copyProperties(message,queryOfflineCardReply);
        queryOfflineCardReplyService.create(queryOfflineCardReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java
New file
@@ -0,0 +1,50 @@
package com.ruoyi.integration.rocket.listener;
import com.ruoyi.integration.api.model.SecurityDetection;
import com.ruoyi.integration.mongodb.service.SecurityDetectionService;
import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage;
import com.ruoyi.integration.rocket.model.SecurityDetectionMessage;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.vo.SecurityDetectionVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
 * @Date 2025/4/28 14:59
 */
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_security_detection",
        topic = "charge_security_detection",
        selectorExpression = "security_detection"
)
public class SecurityDetectionMessageListener implements RocketMQListener<SecurityDetectionMessage> {
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @Override
    public void onMessage(SecurityDetectionMessage message) {
        log.info("安全监测-业务消息处理:{}",message);
        SecurityDetection securityDetection = new SecurityDetection();
        BeanUtils.copyProperties(message,securityDetection);
        securityDetectionService.create(securityDetection);
        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
        BeanUtils.copyProperties(securityDetection, securityDetection1);
        chargingOrderClient.securityDetection(securityDetection1);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java
@@ -19,63 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_setup_billing_model_reply",
        topic = "charge_setup_billing_model_reply",
        selectorExpression = "setup_billing_model_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "setup_billing_model_reply"
)
public class SetupBillingModelReplyMessageListener extends EnhanceMessageHandler<SetupBillingModelReplyMessage> implements RocketMQListener<SetupBillingModelReplyMessage> {
public class SetupBillingModelReplyMessageListener implements RocketMQListener<SetupBillingModelReplyMessage> {
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Override
    protected void handleMessage(SetupBillingModelReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("计费模型应答-业务消息处理:{}",message);
        // 持久化消息
        SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
        BeanUtils.copyProperties(message,setupBillingModelReply);
        setupBillingModelReplyService.create(setupBillingModelReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(SetupBillingModelReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(SetupBillingModelReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(SetupBillingModelReplyMessage message) {
        super.dispatchMessage(message);
        log.info("计费模型应答-业务消息处理:{}",message);
        // 持久化消息
        SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
        BeanUtils.copyProperties(message,setupBillingModelReply);
        setupBillingModelReplyService.create(setupBillingModelReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java
@@ -19,62 +19,25 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_synchronize_offline_card_reply",
        topic = "charge_synchronize_offline_card_reply",
        selectorExpression = "synchronize_offline_card_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "synchronize_offline_card_reply"
)
public class SynchronizeOfflineCardReplyMessageListener extends EnhanceMessageHandler<SynchronizeOfflineCardReplyMessage> implements RocketMQListener<SynchronizeOfflineCardReplyMessage> {
public class SynchronizeOfflineCardReplyMessageListener implements RocketMQListener<SynchronizeOfflineCardReplyMessage> {
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Override
    protected void handleMessage(SynchronizeOfflineCardReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("卡数据同步应答-业务消息处理:{}",message);
        // 持久化消息
        SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
        BeanUtils.copyProperties(message,synchronizeOfflineCardReply);
        synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(SynchronizeOfflineCardReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(SynchronizeOfflineCardReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(SynchronizeOfflineCardReplyMessage message) {
        super.dispatchMessage(message);
        log.info("卡数据同步应答-业务消息处理:{}",message);
        // 持久化消息
        SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
        BeanUtils.copyProperties(message,synchronizeOfflineCardReply);
        synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java
@@ -26,62 +26,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_timing_setting",
        topic = "charge_timing_setting",
        selectorExpression = "timing_setting",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "timing_setting"
)
public class TimingSettingMessageListener extends EnhanceMessageHandler<TimingSettingMessage> implements RocketMQListener<TimingSettingMessage> {
public class TimingSettingMessageListener implements RocketMQListener<TimingSettingMessage> {
    @Autowired
    private TimingSettingService timingSettingService;
    @Override
    protected void handleMessage(TimingSettingMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("对时设置-业务消息处理:{}",message);
        // 持久化消息
        TimingSetting timingSetting = new TimingSetting();
        BeanUtils.copyProperties(message,timingSetting);
        timingSettingService.create(timingSetting);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(TimingSettingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(TimingSettingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(TimingSettingMessage message) {
        super.dispatchMessage(message);
        log.info("对时设置-业务消息处理:{}",message);
        // 持久化消息
        TimingSetting timingSetting = new TimingSetting();
        BeanUtils.copyProperties(message,timingSetting);
        timingSettingService.create(timingSetting);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java
@@ -1,24 +1,31 @@
package com.ruoyi.integration.rocket.listener;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.integration.api.model.ConfirmTransactionRecord;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.api.model.TransactionRecord;
import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce;
import com.ruoyi.integration.iotda.utils.tools.MessageUtil;
import com.ruoyi.integration.mongodb.service.TransactionRecordService;
import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService;
import com.ruoyi.integration.rocket.model.TransactionRecordMessage;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
@@ -27,69 +34,56 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_transaction_record",
        topic = "charge_transaction_record",
        selectorExpression = "transaction_record",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "transaction_record"
)
public class TransactionRecordMessageListener extends EnhanceMessageHandler<TransactionRecordMessage> implements RocketMQListener<TransactionRecordMessage> {
public class TransactionRecordMessageListener implements RocketMQListener<TransactionRecordMessage> {
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private ChargingOrderClient chargingOrderClient;
    @Override
    protected void handleMessage(TransactionRecordMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("交易记录-业务消息处理:{}",message);
        // 持久化消息
        TransactionRecord transactionRecord = new TransactionRecord();
        BeanUtils.copyProperties(message,transactionRecord);
        transactionRecordService.create(transactionRecord);
        // 业务处理
        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        if(Objects.nonNull(chargingOrder)){
            chargingOrder.setTotalElectricity(message.getTotal_electricity());
            chargingOrderClient.updateChargingOrder(chargingOrder);
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(TransactionRecordMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(TransactionRecordMessage message) {
        // 此处可做消息过滤
        return false;
    }
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private RedisTemplate redisTemplate;
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(TransactionRecordMessage message) {
        super.dispatchMessage(message);
        log.info("交易记录-业务消息处理:{}",message);
        message.setResult(JSONObject.toJSONString(message));
        // 持久化消息
        TransactionRecord transactionRecord = new TransactionRecord();
        BeanUtils.copyProperties(message,transactionRecord);
        transactionRecord.setResult(message.getResult());
        transactionRecordService.create(transactionRecord);
        // 业务处理
        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        if(Objects.nonNull(chargingOrderRecord)){
            chargingOrderRecord.setTotalElectricity(message.getTotal_electricity());
            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
        }
        //计算费用
        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
        BeanUtils.copyProperties(message,vo);
        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
        if(200 != code){
            //失败后添加到队列中继续处理数据
            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, message.getTransaction_serial_number());
        }
        // 添加实时上传记录结束记录
        // 查询mogondb上一条数据
        UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
        if(Objects.nonNull(data) && data.getStatus() != 5){
            UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
            BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
            uploadRealTimeMonitoringData.setStatus(5);
            uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
        }
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_update_balance_reply",
        topic = "charge_update_balance_reply",
        selectorExpression = "update_balance_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "update_balance_reply"
)
public class UpdateBalanceReplyMessageListener extends EnhanceMessageHandler<UpdateBalanceReplyMessage> implements RocketMQListener<UpdateBalanceReplyMessage> {
public class UpdateBalanceReplyMessageListener implements RocketMQListener<UpdateBalanceReplyMessage> {
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Override
    protected void handleMessage(UpdateBalanceReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("余额更新应答-业务消息处理:{}",message);
        // 持久化消息
        UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
        BeanUtils.copyProperties(message,updateBalanceReply);
        updateBalanceReplyService.create(updateBalanceReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(UpdateBalanceReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(UpdateBalanceReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(UpdateBalanceReplyMessage message) {
        super.dispatchMessage(message);
        log.info("余额更新应答-业务消息处理:{}",message);
        // 持久化消息
        UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
        BeanUtils.copyProperties(message,updateBalanceReply);
        updateBalanceReplyService.create(updateBalanceReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
@@ -10,8 +10,12 @@
import com.ruoyi.integration.api.feignClient.TCECClient;
import com.ruoyi.integration.api.model.Online;
import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService;
import com.ruoyi.integration.rocket.model.ChargingMessage;
import com.ruoyi.integration.rocket.model.ChargingOrderMessage;
import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage;
import com.ruoyi.integration.rocket.produce.EnhanceProduce;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
@@ -36,10 +40,9 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_upload_real_time_monitoring_data",
        topic = "charge_upload_real_time_monitoring_data",
        selectorExpression = "upload_real_time_monitoring_data",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "upload_real_time_monitoring_data"
)
public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> {
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
@@ -48,129 +51,65 @@
    private ChargingOrderClient chargingOrderClient;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private FaultMessageClient faultMessageClient;
    @Autowired
    private EnhanceProduce enhanceProduce;
    
    @Resource
    private TCECClient tcecClient;
    @Override
    protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("上传实时监测数据-业务消息处理:{}",message);
        // 持久化消息
        UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
        BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
        // 查询mogondb上一条数据
        UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
        // 查询订单
        TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
        // 查询当前时间段的计费策略
        TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
        uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
        uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
        if (Objects.nonNull(data)) {
            uploadRealTimeMonitoringData.setLast_time(data.getLast_time());
            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount()));
            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree()));
            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
        }else {
            log.info("首次上传实时监测数据");
            uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
            uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
            uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
        }
        uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
        // 业务处理
        UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
        BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
        chargingOrderClient.chargeMonitoring(query);
        GetChargingGunByCode code = new GetChargingGunByCode();
        code.setCharging_pile_code(message.getCharging_pile_code());
        code.setCharging_gun_code(message.getCharging_gun_code());
        TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData();
        if(Objects.nonNull(chargingGun)){
            // 存储状态信息
            TFaultMessage faultMessage = new TFaultMessage();
            if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){
                faultMessage.setSiteId(chargingGun.getSiteId());
                faultMessage.setChargingPileId(chargingGun.getChargingPileId());
                faultMessage.setChargingGunId(chargingGun.getId());
                switch (message.getCharging_gun_status()){
                    case 0:
                        faultMessage.setStatus(1);
                        chargingGun.setStatus(1);
                        break;
                    case 1:
                        faultMessage.setStatus(2);
                        chargingGun.setStatus(7);
                        break;
                }
                faultMessage.setDownTime(LocalDateTime.now());
                faultMessageClient.createFaultMessage(faultMessage);
            }else {
                switch (message.getCharging_gun_status()){
                    case 2:
                        chargingGun.setStatus(2);
                        break;
                    case 3:
                        chargingGun.setStatus(4);
                        break;
                }
                // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录
                faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData();
                if(Objects.nonNull(faultMessage)){
                    faultMessage.setEndTime(LocalDateTime.now());
                    faultMessageClient.updateFaultMessage(faultMessage);
                }
            }
            chargingGunClient.updateChargingGunById(chargingGun);
            //推送状态给三方平台
            tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus());
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(UploadRealTimeMonitoringDataMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(UploadRealTimeMonitoringDataMessage message) {
        super.dispatchMessage(message);
        try {
            log.info("上传实时监测数据-业务消息处理:{}",message);
            // 持久化消息
            UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
            BeanUtils.copyProperties(message,uploadRealTimeMonitoringData);
            // 查询mogondb上一条数据
            UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number());
            // 查询订单
            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData();
            // 查询当前时间段的计费策略
            TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
            uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
            uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
            if (Objects.nonNull(data)) {
                uploadRealTimeMonitoringDataService.updateById(data.getId());
                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount()));
                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree()));
                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
            }else {
                log.info("首次上传实时监测数据");
                uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount());
                uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree());
                uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
            }
            uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
            uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
            uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
            int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
            if(i == 0){
                log.error("数据存储mongo失败");
            }
            // 业务处理
            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
            chargingOrderClient.chargeMonitoring(query);
            // 订单id
            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
            // 推送充电订单信息
            ChargingMessage chargingMessage4 = new ChargingMessage();
            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
            chargingMessage4.setOrderMessage(chargingOrderMessage3);
            enhanceProduce.orderInfoMessage(chargingMessage4);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java
@@ -19,62 +19,24 @@
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_working_parameter_setting_reply",
        topic = "charge_working_parameter_setting_reply",
        selectorExpression = "working_parameter_setting_reply",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
        selectorExpression = "working_parameter_setting_reply"
)
public class WorkingParameterSettingReplyMessageListener extends EnhanceMessageHandler<WorkingParameterSettingReplyMessage> implements RocketMQListener<WorkingParameterSettingReplyMessage> {
public class WorkingParameterSettingReplyMessageListener implements RocketMQListener<WorkingParameterSettingReplyMessage> {
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Override
    protected void handleMessage(WorkingParameterSettingReplyMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        log.info("充电桩工作参数设置应答-业务消息处理:{}",message);
        // 持久化消息
        WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
        BeanUtils.copyProperties(message,workingParameterSettingReply);
        workingParameterSettingReplyService.create(workingParameterSettingReply);
        // 业务处理
    }
    @Override
    protected void handleMaxRetriesExceeded(WorkingParameterSettingReplyMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(WorkingParameterSettingReplyMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(WorkingParameterSettingReplyMessage message) {
        super.dispatchMessage(message);
        log.info("充电桩工作参数设置应答-业务消息处理:{}",message);
        // 持久化消息
        WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
        BeanUtils.copyProperties(message,workingParameterSettingReply);
        workingParameterSettingReplyService.create(workingParameterSettingReply);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -1,628 +1,625 @@
package com.ruoyi.integration.rocket.produce;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient;
import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
import com.ruoyi.chargingPile.api.model.TChargingGun;
import com.ruoyi.chargingPile.api.model.TFaultMessage;
import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode;
import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
import com.ruoyi.common.redis.service.RedisService;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.drainage.TCECPushUtil;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter;
import com.ruoyi.integration.mongodb.service.*;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
import com.ruoyi.order.api.vo.SecurityDetectionVO;
import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_message",
        topic = "charge_charging_message",
        selectorExpression = "charging_message",
        consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private PingService pingService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @Autowired
    private TCECPushUtil tcecPushUtil;
    @Resource
    private ChargingPileClient chargingPileClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private EnhanceProduce enhanceProduce;
    @StreamListener("input")
    @Override
    protected void handleMessage(ChargingMessage message) throws Exception {
        log.info("rocket收到的消息内容:{}",message);
        String serviceId = message.getServiceId();
        if(!StringUtils.hasLength(serviceId)){
            return;
        }
        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
        switch (serviceId){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = message.getOnlineMessage();
                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
                // 持久化消息
                Online online = new Online();
                BeanUtils.copyProperties(onlineMessage,online);
                onlineService.create(online);
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = message.getPingMessage();
                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
                // 持久化消息
                Ping ping = new Ping();
                BeanUtils.copyProperties(pingMessage,ping);
                pingService.save(ping);
                //存储缓存中,5分钟有效
                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
                        vo1.setGun_code(pingMessage.getCharging_gun_code());
                        vo1.setPile_code(pingMessage.getCharging_pile_code());
                        vo1.setStatus(pingMessage.getCharging_gun_status());
                        chargingPileClient.updateChargingPileStatus(vo1);
                    }
                });
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
                log.info("充电结束-业务消息处理:{}",endChargeMessage);
                // 持久化消息
                EndCharge endCharge = new EndCharge();
                BeanUtils.copyProperties(endChargeMessage,endCharge);
                endChargeService.create(endCharge);
                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor1.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                        // 订单id
                        String transactionSerialNumber = endCharge.getTransaction_serial_number();
                        ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
                        chargingOrderMessage.setOrderNumber(transactionSerialNumber);
                        // 推送充电订单信息
                        ChargingMessage chargingMessage1 = new ChargingMessage();
                        chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
                        chargingMessage1.setOrderMessage(chargingOrderMessage);
                        enhanceProduce.orderInfoMessage(chargingMessage1);
                        // 推送充电订单状态
                        ChargingMessage chargingMessage2 = new ChargingMessage();
                        chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
                        chargingMessage2.setOrderMessage(chargingOrderMessage);
                        enhanceProduce.orderStatusMessage(chargingMessage2);
//                        try {
//                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
//                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }catch (Exception e){
//                            e.printStackTrace();
//                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
//package com.ruoyi.integration.rocket.produce;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
//import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient;
//import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
//import com.ruoyi.chargingPile.api.model.TChargingGun;
//import com.ruoyi.chargingPile.api.model.TFaultMessage;
//import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode;
//import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
//import com.ruoyi.common.redis.service.RedisService;
//import com.ruoyi.integration.api.model.*;
//import com.ruoyi.integration.drainage.TCECPushUtil;
//import com.ruoyi.integration.iotda.constant.SendTagConstant;
//import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
//import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter;
//import com.ruoyi.integration.mongodb.service.*;
//import com.ruoyi.integration.rocket.model.*;
//import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
//import com.ruoyi.order.api.feignClient.ChargingOrderClient;
//import com.ruoyi.order.api.model.TChargingOrder;
//import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
//import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
//import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
//import com.ruoyi.order.api.vo.SecurityDetectionVO;
//import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.rocketmq.spring.annotation.MessageModel;
//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
//import org.apache.rocketmq.spring.core.RocketMQListener;
//import org.springframework.beans.BeanUtils;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import javax.annotation.Resource;
//import java.math.RoundingMode;
//import java.time.LocalDateTime;
//import java.util.Date;
//import java.util.Objects;
//import java.util.Set;
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
//@Slf4j
//@Component
//@RocketMQMessageListener(
//        messageModel = MessageModel.CLUSTERING,
//        consumerGroup = "charge_charging_message",
//        topic = "charge_charging_message",
//        selectorExpression = "charging_message",
//        consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
//)
//public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> {
//
//    @Autowired
//    private AcquisitionBillingModeService acquisitionBillingModeService;
//    @Autowired
//    private BillingModeVerifyService billingModeVerifyService;
//    @Autowired
//    private BmsAbortService bmsAbortService;
//    @Resource
//    private ChargingOrderClient chargingOrderClient;
//    @Autowired
//    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
//    @Autowired
//    private OnlineService onlineService;
//    @Autowired
//    private PingService pingService;
//    @Autowired
//    private EndChargeService endChargeService;
//    @Autowired
//    private ErrorMessageMessageService errorMessageMessageService;
//    @Autowired
//    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
//    @Resource
//    private AccountingStrategyDetailClient accountingStrategyDetailClient;
//    @Autowired
//    private ChargingHandshakeService chargingHandshakeService;
//    @Autowired
//    private ParameterSettingService parameterSettingService;
//    @Autowired
//    private MotorAbortService motorAbortService;
//    @Autowired
//    private BmsInformationService bmsInformationService;
//    @Autowired
//    private ChargingPileStartsChargingService chargingPileStartsChargingService;
//    @Autowired
//    private PlatformStartChargingReplyService platformStartChargingReplyService;
//    @Autowired
//    private PlatformStopChargingReplyService platformStopChargingReplyService;
//    @Autowired
//    private TransactionRecordService transactionRecordService;
//    @Autowired
//    private UpdateBalanceReplyService updateBalanceReplyService;
//    @Autowired
//    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
//    @Autowired
//    private ClearOfflineCardReplyService clearOfflineCardReplyService;
//    @Autowired
//    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
//    @Autowired
//    private TimingSettingService timingSettingService;
//    @Autowired
//    private SetupBillingModelReplyService setupBillingModelReplyService;
//    @Autowired
//    private GroundLockRealTimeDataService groundLockRealTimeDataService;
//    @Autowired
//    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
//    @Autowired
//    private PlatformRestartReplyService platformRestartReplyService;
//    @Autowired
//    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
//    @Autowired
//    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
//    @Autowired
//    private SecurityDetectionService securityDetectionService;
//    @Autowired
//    private TCECPushUtil tcecPushUtil;
//
//    @Resource
//    private ChargingPileClient chargingPileClient;
//    @Resource
//    private ChargingGunClient chargingGunClient;
//
//    @Resource
//    private RedisTemplate redisTemplate;
//
//    @Autowired
//    private EnhanceProduce enhanceProduce;
//
//
//
//    @Override
//    protected void handleMessage(ChargingMessage message) throws Exception {
//        log.info("rocket收到的消息内容:{}",message);
//        String serviceId = message.getServiceId();
//        if(!StringUtils.hasLength(serviceId)){
//            return;
//        }
//        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
//        switch (serviceId){
//            case SendTagConstant.ONLINE:
//                OnlineMessage onlineMessage = message.getOnlineMessage();
//                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
//                // 持久化消息
//                Online online = new Online();
//                BeanUtils.copyProperties(onlineMessage,online);
//                onlineService.create(online);
//                break;
//            case SendTagConstant.PING:
//                PingMessage pingMessage = message.getPingMessage();
//                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
//                // 持久化消息
//                Ping ping = new Ping();
//                BeanUtils.copyProperties(pingMessage,ping);
//                pingService.save(ping);
//                //存储缓存中,5分钟有效
//                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
//                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
//                        vo1.setGun_code(pingMessage.getCharging_gun_code());
//                        vo1.setPile_code(pingMessage.getCharging_pile_code());
//                        vo1.setStatus(pingMessage.getCharging_gun_status());
//                        chargingPileClient.updateChargingPileStatus(vo1);
//                    }
//                });
//                break;
//            case SendTagConstant.END_CHARGE:
//                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
//                log.info("充电结束-业务消息处理:{}",endChargeMessage);
//                // 持久化消息
//                EndCharge endCharge = new EndCharge();
//                BeanUtils.copyProperties(endChargeMessage,endCharge);
//                endChargeService.create(endCharge);
//                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor1.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
//                        // 订单id
//                        String transactionSerialNumber = endCharge.getTransaction_serial_number();
//                        ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
//                        chargingOrderMessage.setOrderNumber(transactionSerialNumber);
//                        // 推送充电订单信息
//                        ChargingMessage chargingMessage1 = new ChargingMessage();
//                        chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
//                        chargingMessage1.setOrderMessage(chargingOrderMessage);
//                        enhanceProduce.orderInfoMessage(chargingMessage1);
//                        // 推送充电订单状态
//                        ChargingMessage chargingMessage2 = new ChargingMessage();
//                        chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
//                        chargingMessage2.setOrderMessage(chargingOrderMessage);
//                        enhanceProduce.orderStatusMessage(chargingMessage2);
////                        try {
////                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
////                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
////                        }catch (Exception e){
////                            e.printStackTrace();
////                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
////                        }
//                    }
//                });
//                break;
//            case SendTagConstant.ERROR_MESSAGE:
//                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
//                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
//                // 持久化消息
//                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
//                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
//                errorMessageMessageService.create(errorMessageMessage);
//                break;
//            case SendTagConstant.BILLING_MODE_VERIFY:
//                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
//                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
//                // 持久化消息
//                BillingModeVerify billingModeVerify = new BillingModeVerify();
//                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
//                billingModeVerifyService.create(billingModeVerify);
//                break;
//            case SendTagConstant.ACQUISITION_BILLING_MODE:
//                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
//                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
//                // 持久化消息
//                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
//                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
//                acquisitionBillingModeService.create(acquisitionBillingMode);
//                break;
//            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
//                try {
//                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
//                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
//                    // 持久化消息
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
//                    // 查询mogondb上一条数据
//                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
//                    // 查询订单
//                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
//                    // 查询当前时间段的计费策略
//                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
//                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
//                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
//                    if (Objects.nonNull(data)) {
//                        uploadRealTimeMonitoringDataService.updateById(data.getId());
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }else {
//                        log.info("首次上传实时监测数据");
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }
//                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
//                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
//                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
////                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
////                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
//                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                    if(i == 0){
//                        log.error("数据存储mongo失败");
//                    }
//
//                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                    threadPoolExecutor2.execute(new Runnable() {
//                        @Override
//                        public void run() {
//                            // 业务处理
//                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
//                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
//                            chargingOrderClient.chargeMonitoring(query);
//                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
//                            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
//                            chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
//                            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
//                            // 推送充电订单信息
//                            ChargingMessage chargingMessage4 = new ChargingMessage();
//                            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
//                            chargingMessage4.setOrderMessage(chargingOrderMessage3);
//                            enhanceProduce.orderInfoMessage(chargingMessage4);
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }
                    }
                });
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
                // 持久化消息
                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
                errorMessageMessageService.create(errorMessageMessage);
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
                // 持久化消息
                BillingModeVerify billingModeVerify = new BillingModeVerify();
                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
                billingModeVerifyService.create(billingModeVerify);
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
                // 持久化消息
                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
                acquisitionBillingModeService.create(acquisitionBillingMode);
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                try {
                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
                    // 持久化消息
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
                    // 查询mogondb上一条数据
                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
                    // 查询订单
                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
                    // 查询当前时间段的计费策略
                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
                    if (Objects.nonNull(data)) {
                        uploadRealTimeMonitoringDataService.updateById(data.getId());
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }else {
                        log.info("首次上传实时监测数据");
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }
                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
//                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
//                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                    if(i == 0){
                        log.error("数据存储mongo失败");
                    }
                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                    threadPoolExecutor2.execute(new Runnable() {
                        @Override
                        public void run() {
                            // 业务处理
                            UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                            BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                            chargingOrderClient.chargeMonitoring(query);
                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
                            ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
                            chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
                            chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
                            // 推送充电订单信息
                            ChargingMessage chargingMessage4 = new ChargingMessage();
                            chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
                            chargingMessage4.setOrderMessage(chargingOrderMessage3);
                            enhanceProduce.orderInfoMessage(chargingMessage4);
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
                // 持久化消息
                ChargingHandshake chargingHandshake = new ChargingHandshake();
                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
                chargingHandshakeService.create(chargingHandshake);
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
                log.info("业务消息处理:{}",parameterSettingMessage);
                // 持久化消息
                ParameterSetting parameterSetting = new ParameterSetting();
                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
                parameterSettingService.create(parameterSetting);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
                // 持久化消息
                BmsAbort bmsAbort = new BmsAbort();
                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                bmsAbortService.create(bmsAbort);
                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor3.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                    }
                });
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
                // 持久化消息
                MotorAbort motorAbort = new MotorAbort();
                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                motorAbortService.create(motorAbort);
                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor4.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                    }
                });
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
                // 持久化消息
                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor5.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderBms)){
                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
                        }
                    }
                });
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
                // 持久化消息
                BmsInformation bmsInformation = new BmsInformation();
                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
                bmsInformationService.create(bmsInformation);
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
                // 持久化消息
                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
                chargingPileStartsChargingService.create(chargingPileStartsCharging);
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
                // 持久化消息
                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                platformStartChargingReplyService.create(platformStartChargingReply);
                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor6.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                        chargingOrderClient.startChargeSuccessfully(message1);
                    }
                });
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
                // 持久化消息
                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                platformStopChargingReplyService.create(platformStopChargingReply);
                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor7.execute(new Runnable() {
                    @Override
                    public void run() {
                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                    }
                });
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
                // 持久化消息
                TransactionRecord transactionRecord = new TransactionRecord();
                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                transactionRecordService.create(transactionRecord);
                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor8.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                        if(Objects.nonNull(chargingOrderRecord)){
                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                        }
                        //计算费用
                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                        BeanUtils.copyProperties(transactionRecordMessage,vo);
                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                        if(200 != code){
                            //失败后添加到队列中继续处理数据
                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
                        }
                    }
                });
                // 添加实时上传记录结束记录
                // 查询mogondb上一条数据
                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
                if(Objects.nonNull(data) && data.getStatus() != 5){
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
                    uploadRealTimeMonitoringData.setStatus(5);
                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                }
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
                // 持久化消息
                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
                updateBalanceReplyService.create(updateBalanceReply);
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
                // 持久化消息
                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
                // 持久化消息
                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
                clearOfflineCardReplyService.create(clearOfflineCardReply);
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
                // 持久化消息
                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
                workingParameterSettingReplyService.create(workingParameterSettingReply);
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
                // 持久化消息
                TimingSetting timingSetting = new TimingSetting();
                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
                timingSettingService.create(timingSetting);
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
                // 持久化消息
                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
                setupBillingModelReplyService.create(setupBillingModelReply);
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
                // 持久化消息
                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
                groundLockRealTimeDataService.create(groundLockRealTimeData);
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
                // 持久化消息
                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
                // 持久化消息
                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
                platformRestartReplyService.create(platformRestartReply);
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
                SecurityDetection securityDetection = new SecurityDetection();
                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                securityDetectionService.create(securityDetection);
                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor9.execute(new Runnable() {
                    @Override
                    public void run() {
                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
                        BeanUtils.copyProperties(securityDetection, securityDetection1);
                        chargingOrderClient.securityDetection(securityDetection1);
                    }
                });
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
                break;
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingMessage message) {
        super.dispatchMessage(message);
    }
    /**
     * 处理未正常完成费用计算的订单
     */
    public void transactionRecord(){
        Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD);
        for (String member : members) {
            TransactionRecord transactionRecord = transactionRecordService.findOne(member);
            if(null == transactionRecord){
                redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
            }else{
                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                BeanUtils.copyProperties(transactionRecord, vo);
                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                if(200 == code){
                    redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
                }
            }
        }
    }
}
//                    });
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//                break;
//            case SendTagConstant.CHARGING_HANDSHAKE:
//                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
//                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
//                // 持久化消息
//                ChargingHandshake chargingHandshake = new ChargingHandshake();
//                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
//                chargingHandshakeService.create(chargingHandshake);
//                break;
//            case SendTagConstant.PARAMETER_SETTING:
//                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
//                log.info("业务消息处理:{}",parameterSettingMessage);
//                // 持久化消息
//                ParameterSetting parameterSetting = new ParameterSetting();
//                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
//                parameterSettingService.create(parameterSetting);
//                break;
//            case SendTagConstant.BMS_ABORT:
//                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
//                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
//                // 持久化消息
//                BmsAbort bmsAbort = new BmsAbort();
//                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
//                bmsAbortService.create(bmsAbort);
//
//                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor3.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
//                    }
//                });
//                break;
//            case SendTagConstant.MOTOR_ABORT:
//                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
//                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
//                // 持久化消息
//                MotorAbort motorAbort = new MotorAbort();
//                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
//                motorAbortService.create(motorAbort);
//                ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor4.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
//                    }
//                });
//                break;
//            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
//                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
//                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
//                // 持久化消息
//                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
//                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
//                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
//                ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor5.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
//                        if(Objects.nonNull(chargingOrderBms)){
//                            chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
//                            chargingOrderClient.updateChargingOrder(chargingOrderBms);
//                        }
//                    }
//                });
//                break;
//            case SendTagConstant.BMS_INFORMATION:
//                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
//                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
//                // 持久化消息
//                BmsInformation bmsInformation = new BmsInformation();
//                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
//                bmsInformationService.create(bmsInformation);
//                break;
//            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
//                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
//                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
//                // 持久化消息
//                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
//                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
//                chargingPileStartsChargingService.create(chargingPileStartsCharging);
//                break;
//            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
//                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
//                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
//                // 持久化消息
//                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
//                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
//                platformStartChargingReplyService.create(platformStartChargingReply);
//                ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor6.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO();
//                        BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
//                        chargingOrderClient.startChargeSuccessfully(message1);
//                    }
//                });
//                break;
//            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
//                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
//                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
//                // 持久化消息
//                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
//                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
//                platformStopChargingReplyService.create(platformStopChargingReply);
//                ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor7.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
//                        BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
//                        chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
//                    }
//                });
//                break;
//            case SendTagConstant.TRANSACTION_RECORD:
//                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
//                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
//                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
//                // 持久化消息
//                TransactionRecord transactionRecord = new TransactionRecord();
//                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
//                transactionRecordService.create(transactionRecord);
//                ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor8.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        // 业务处理
//                        TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
//                        if(Objects.nonNull(chargingOrderRecord)){
//                            chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
//                            chargingOrderClient.updateChargingOrder(chargingOrderRecord);
//                        }
//                        //计算费用
//                        TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
//                        BeanUtils.copyProperties(transactionRecordMessage,vo);
//                        int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
//                        if(200 != code){
//                            //失败后添加到队列中继续处理数据
//                            redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
//                        }
//                    }
//                });
//
//
//                // 添加实时上传记录结束记录
//                // 查询mogondb上一条数据
//                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
//                if(Objects.nonNull(data) && data.getStatus() != 5){
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
//                    uploadRealTimeMonitoringData.setStatus(5);
//                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                }
//                break;
//            case SendTagConstant.UPDATE_BALANCE_REPLY:
//                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
//                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
//                // 持久化消息
//                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
//                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
//                updateBalanceReplyService.create(updateBalanceReply);
//                break;
//            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
//                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
//                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
//                // 持久化消息
//                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
//                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
//                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
//                break;
//            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
//                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
//                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
//                // 持久化消息
//                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
//                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
//                clearOfflineCardReplyService.create(clearOfflineCardReply);
//                break;
//            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
//                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
//                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
//                // 持久化消息
//                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
//                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
//                workingParameterSettingReplyService.create(workingParameterSettingReply);
//                break;
//            case SendTagConstant.TIMING_SETTING:
//                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
//                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
//                // 持久化消息
//                TimingSetting timingSetting = new TimingSetting();
//                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
//                timingSettingService.create(timingSetting);
//                break;
//            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
//                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
//                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
//                // 持久化消息
//                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
//                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
//                setupBillingModelReplyService.create(setupBillingModelReply);
//                break;
//            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
//                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
//                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
//                // 持久化消息
//                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
//                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
//                groundLockRealTimeDataService.create(groundLockRealTimeData);
//                break;
//            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
//                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
//                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
//                // 持久化消息
//                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
//                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
//                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
//                break;
//            case SendTagConstant.PLATFORM_RESTART_REPLY:
//                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
//                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
//                // 持久化消息
//                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
//                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
//                platformRestartReplyService.create(platformRestartReply);
//                break;
//            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
//                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
//                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
//                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
//                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
//                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
//                break;
//            case SendTagConstant.SECURITY_DETECTION:
//                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
//                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
//                SecurityDetection securityDetection = new SecurityDetection();
//                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
//                securityDetectionService.create(securityDetection);
//                ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor9.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
//                        BeanUtils.copyProperties(securityDetection, securityDetection1);
//                        chargingOrderClient.securityDetection(securityDetection1);
//                    }
//                });
//                break;
//            default:
//                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
//                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
//                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
//                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
//                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
//                break;
//        }
//    }
//
//    @Override
//    protected void handleMaxRetriesExceeded(ChargingMessage message) {
//        // 当超过指定重试次数消息时此处方法会被调用
//        // 生产中可以进行回退或其他业务操作
//        log.error("消息消费失败,请执行后续处理");
//    }
//
//
//    /**
//     * 是否执行重试机制
//     */
//    @Override
//    protected boolean isRetry() {
//        return true;
//    }
//
//    @Override
//    protected boolean throwException() {
//        // 是否抛出异常,false搭配retry自行处理异常
//        return false;
//    }
//
//    /**
//     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
//     * @param message 待处理消息
//     * @return true: 本次消息被过滤,false:不过滤
//     */
//    @Override
//    protected boolean filter(ChargingMessage message) {
//        // 此处可做消息过滤
//        return false;
//    }
//
//    /**
//     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
//     */
//    @Override
//    public void onMessage(ChargingMessage message) {
//        super.dispatchMessage(message);
//    }
//
//
//    /**
//     * 处理未正常完成费用计算的订单
//     */
//    public void transactionRecord(){
//        Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD);
//        for (String member : members) {
//            TransactionRecord transactionRecord = transactionRecordService.findOne(member);
//            if(null == transactionRecord){
//                redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
//            }else{
//                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
//                BeanUtils.copyProperties(transactionRecord, vo);
//                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
//                if(200 == code){
//                    redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member);
//                }
//            }
//        }
//    }
//}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
@@ -1,527 +1,522 @@
package com.ruoyi.integration.rocket.produce;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
import com.ruoyi.chargingPile.api.model.TChargingGun;
import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.drainage.TCECPushUtil;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.mongodb.service.*;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
import com.ruoyi.order.api.vo.SecurityDetectionVO;
import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.math.RoundingMode;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class ChargingMessageUtil {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private PingService pingService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @Autowired
    private TCECPushUtil tcecPushUtil;
    @Resource
    private ChargingPileClient chargingPileClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private EnhanceProduce enhanceProduce;
    public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){
        log.info("rocket收到的消息内容:{}",message);
        String serviceId = message.getServiceId();
        if(!StringUtils.hasLength(serviceId)){
            return;
        }
        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
        switch (serviceId){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = message.getOnlineMessage();
                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
                // 持久化消息
                Online online = new Online();
                BeanUtils.copyProperties(onlineMessage,online);
                onlineService.create(online);
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = message.getPingMessage();
                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
                // 持久化消息
                Ping ping = new Ping();
                BeanUtils.copyProperties(pingMessage,ping);
                pingService.save(ping);
                //存储缓存中,5分钟有效
                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
                UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
                vo1.setGun_code(pingMessage.getCharging_gun_code());
                vo1.setPile_code(pingMessage.getCharging_pile_code());
                vo1.setStatus(pingMessage.getCharging_gun_status());
                chargingPileClient.updateChargingPileStatus(vo1);
                // 监管平台推送充电设备状态
                SendResult sendResult;
                String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code();
                ChargingMessage chargingMessage = new ChargingMessage();
                chargingMessage.setServiceId(SendTagConstant.GUN_STATUS);
                GunStatusMessage gunStatusMessage = new GunStatusMessage();
                gunStatusMessage.setFullNumber(gunCode);
                chargingMessage.setGunStatusMessage(gunStatusMessage);
                sendResult = enhanceProduce.gunStatusMessage(chargingMessage);
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
                log.info("充电结束-业务消息处理:{}",endChargeMessage);
                // 持久化消息
                EndCharge endCharge = new EndCharge();
                BeanUtils.copyProperties(endChargeMessage,endCharge);
                endChargeService.create(endCharge);
                // 业务处理
                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                // 订单id
                String transactionSerialNumber = endCharge.getTransaction_serial_number();
                ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
                chargingOrderMessage.setOrderNumber(transactionSerialNumber);
                // 推送充电订单信息
                ChargingMessage chargingMessage1 = new ChargingMessage();
                chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
                chargingMessage1.setOrderMessage(chargingOrderMessage);
                enhanceProduce.orderInfoMessage(chargingMessage1);
                // 推送充电订单状态
                ChargingMessage chargingMessage2 = new ChargingMessage();
                chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
                chargingMessage2.setOrderMessage(chargingOrderMessage);
                enhanceProduce.orderStatusMessage(chargingMessage2);
//                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor1.execute(new Runnable() {
//package com.ruoyi.integration.rocket.produce;
//
//import com.alibaba.fastjson.JSONObject;
//import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
//import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
//import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
//import com.ruoyi.chargingPile.api.model.TChargingGun;
//import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo;
//import com.ruoyi.integration.api.model.*;
//import com.ruoyi.integration.drainage.TCECPushUtil;
//import com.ruoyi.integration.iotda.constant.SendTagConstant;
//import com.ruoyi.integration.mongodb.service.*;
//import com.ruoyi.integration.rocket.model.*;
//import com.ruoyi.order.api.feignClient.ChargingOrderClient;
//import com.ruoyi.order.api.model.TChargingOrder;
//import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
//import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO;
//import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO;
//import com.ruoyi.order.api.vo.SecurityDetectionVO;
//import com.ruoyi.order.api.vo.TransactionRecordMessageVO;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.rocketmq.client.producer.SendResult;
//import org.springframework.beans.BeanUtils;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import javax.annotation.Resource;
//import java.math.RoundingMode;
//import java.util.Objects;
//import java.util.Set;
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
//
//@Slf4j
//@Component
//public class ChargingMessageUtil {
//
//    @Autowired
//    private AcquisitionBillingModeService acquisitionBillingModeService;
//    @Autowired
//    private BillingModeVerifyService billingModeVerifyService;
//    @Autowired
//    private BmsAbortService bmsAbortService;
//    @Resource
//    private ChargingOrderClient chargingOrderClient;
//    @Autowired
//    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
//    @Autowired
//    private OnlineService onlineService;
//    @Autowired
//    private PingService pingService;
//    @Autowired
//    private EndChargeService endChargeService;
//    @Autowired
//    private ErrorMessageMessageService errorMessageMessageService;
//    @Autowired
//    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
//    @Resource
//    private AccountingStrategyDetailClient accountingStrategyDetailClient;
//    @Autowired
//    private ChargingHandshakeService chargingHandshakeService;
//    @Autowired
//    private ParameterSettingService parameterSettingService;
//    @Autowired
//    private MotorAbortService motorAbortService;
//    @Autowired
//    private BmsInformationService bmsInformationService;
//    @Autowired
//    private ChargingPileStartsChargingService chargingPileStartsChargingService;
//    @Autowired
//    private PlatformStartChargingReplyService platformStartChargingReplyService;
//    @Autowired
//    private PlatformStopChargingReplyService platformStopChargingReplyService;
//    @Autowired
//    private TransactionRecordService transactionRecordService;
//    @Autowired
//    private UpdateBalanceReplyService updateBalanceReplyService;
//    @Autowired
//    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
//    @Autowired
//    private ClearOfflineCardReplyService clearOfflineCardReplyService;
//    @Autowired
//    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
//    @Autowired
//    private TimingSettingService timingSettingService;
//    @Autowired
//    private SetupBillingModelReplyService setupBillingModelReplyService;
//    @Autowired
//    private GroundLockRealTimeDataService groundLockRealTimeDataService;
//    @Autowired
//    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
//    @Autowired
//    private PlatformRestartReplyService platformRestartReplyService;
//    @Autowired
//    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
//    @Autowired
//    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
//    @Autowired
//    private SecurityDetectionService securityDetectionService;
//    @Autowired
//    private TCECPushUtil tcecPushUtil;
//
//    @Resource
//    private ChargingPileClient chargingPileClient;
//    @Resource
//    private ChargingGunClient chargingGunClient;
//
//    @Resource
//    private RedisTemplate redisTemplate;
//    @Autowired
//    private EnhanceProduce enhanceProduce;
//
//
//
//
//
//    public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){
//        log.info("rocket收到的消息内容:{}",message);
//        String serviceId = message.getServiceId();
//        if(!StringUtils.hasLength(serviceId)){
//            return;
//        }
//        log.info("rocket收到的消息内容:{}   {}", serviceId,message);
//        switch (serviceId){
//            case SendTagConstant.ONLINE:
//                OnlineMessage onlineMessage = message.getOnlineMessage();
//                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
//                // 持久化消息
//                Online online = new Online();
//                BeanUtils.copyProperties(onlineMessage,online);
//                onlineService.create(online);
//                break;
//            case SendTagConstant.PING:
//                PingMessage pingMessage = message.getPingMessage();
//                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
//                // 持久化消息
//                Ping ping = new Ping();
//                BeanUtils.copyProperties(pingMessage,ping);
//                pingService.save(ping);
//                //存储缓存中,5分钟有效
//                redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES);
//
//                UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo();
//                vo1.setGun_code(pingMessage.getCharging_gun_code());
//                vo1.setPile_code(pingMessage.getCharging_pile_code());
//                vo1.setStatus(pingMessage.getCharging_gun_status());
//                chargingPileClient.updateChargingPileStatus(vo1);
//                // 监管平台推送充电设备状态
//                SendResult sendResult;
//                String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code();
//                ChargingMessage chargingMessage = new ChargingMessage();
//                chargingMessage.setServiceId(SendTagConstant.GUN_STATUS);
//                GunStatusMessage gunStatusMessage = new GunStatusMessage();
//                gunStatusMessage.setFullNumber(gunCode);
//                chargingMessage.setGunStatusMessage(gunStatusMessage);
//                sendResult = enhanceProduce.gunStatusMessage(chargingMessage);
//
//                break;
//            case SendTagConstant.END_CHARGE:
//                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
//                log.info("充电结束-业务消息处理:{}",endChargeMessage);
//                // 持久化消息
//                EndCharge endCharge = new EndCharge();
//                BeanUtils.copyProperties(endChargeMessage,endCharge);
//                endChargeService.create(endCharge);
//                // 业务处理
//                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
//                // 订单id
//                String transactionSerialNumber = endCharge.getTransaction_serial_number();
//                ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage();
//                chargingOrderMessage.setOrderNumber(transactionSerialNumber);
//                // 推送充电订单信息
//                ChargingMessage chargingMessage1 = new ChargingMessage();
//                chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO);
//                chargingMessage1.setOrderMessage(chargingOrderMessage);
//                enhanceProduce.orderInfoMessage(chargingMessage1);
//                // 推送充电订单状态
//                ChargingMessage chargingMessage2 = new ChargingMessage();
//                chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS);
//                chargingMessage2.setOrderMessage(chargingOrderMessage);
//                enhanceProduce.orderStatusMessage(chargingMessage2);
////                ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
////                threadPoolExecutor1.execute(new Runnable() {
////                    @Override
////                    public void run() {
////                        try {
////                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
////                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
////                        }catch (Exception e){
////                            e.printStackTrace();
////                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
////                        }
////                    }
////                });
//                break;
//            case SendTagConstant.ERROR_MESSAGE:
//                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
//                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
//                // 持久化消息
//                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
//                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
//                errorMessageMessageService.create(errorMessageMessage);
//                break;
//            case SendTagConstant.BILLING_MODE_VERIFY:
//                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
//                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
//                // 持久化消息
//                BillingModeVerify billingModeVerify = new BillingModeVerify();
//                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
//                billingModeVerifyService.create(billingModeVerify);
//                break;
//            case SendTagConstant.ACQUISITION_BILLING_MODE:
//                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
//                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
//                // 持久化消息
//                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
//                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
//                acquisitionBillingModeService.create(acquisitionBillingMode);
//                break;
//            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
//                try {
//                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
//                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
//                    // 持久化消息
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
//                    // 查询mogondb上一条数据
//                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
//                    // 查询订单
//                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
//                    // 查询当前时间段的计费策略
//                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
//                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
//                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
//                    if (Objects.nonNull(data)) {
//                        uploadRealTimeMonitoringDataService.updateById(data.getId());
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }else {
//                        log.info("首次上传实时监测数据");
//                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
//                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
//                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
//                    }
//                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
//                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
//                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
////                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
////                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
//                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                    if(i == 0){
//                        log.error("数据存储mongo失败");
//                    }
//
//                    // 业务处理
//                    UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
//                    BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
//                    chargingOrderClient.chargeMonitoring(query);
//                    // 订单id
//                    ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
//                    chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
//                    // 推送充电订单信息
//                    ChargingMessage chargingMessage4 = new ChargingMessage();
//                    chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
//                    chargingMessage4.setOrderMessage(chargingOrderMessage3);
//                    enhanceProduce.orderInfoMessage(chargingMessage4);
////                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
////                    threadPoolExecutor2.execute(new Runnable() {
////                        @Override
////                        public void run() {
////                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
////
////                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
////                        }
////                    });
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//                break;
//            case SendTagConstant.CHARGING_HANDSHAKE:
//                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
//                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
//                // 持久化消息
//                ChargingHandshake chargingHandshake = new ChargingHandshake();
//                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
//                chargingHandshakeService.create(chargingHandshake);
//                break;
//            case SendTagConstant.PARAMETER_SETTING:
//                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
//                log.info("业务消息处理:{}",parameterSettingMessage);
//                // 持久化消息
//                ParameterSetting parameterSetting = new ParameterSetting();
//                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
//                parameterSettingService.create(parameterSetting);
//                break;
//            case SendTagConstant.BMS_ABORT:
//                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
//                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
//                // 持久化消息
//                BmsAbort bmsAbort = new BmsAbort();
//                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
//                bmsAbortService.create(bmsAbort);
//
//                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                threadPoolExecutor3.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        try {
//                            TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData();
//                            tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder);
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }catch (Exception e){
//                            e.printStackTrace();
//                            System.out.println("充电结束推送监管平台失败:"+e.getMessage());
//                        }
//                        // 业务处理
//                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
//                    }
//                });
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
                // 持久化消息
                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
                errorMessageMessageService.create(errorMessageMessage);
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
                // 持久化消息
                BillingModeVerify billingModeVerify = new BillingModeVerify();
                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
                billingModeVerifyService.create(billingModeVerify);
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
                // 持久化消息
                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
                acquisitionBillingModeService.create(acquisitionBillingMode);
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                try {
                    UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                    log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
                    // 持久化消息
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
                    // 查询mogondb上一条数据
                    UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
                    // 查询订单
                    TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
                    // 查询当前时间段的计费策略
                    TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
                    uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
                    uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
                    if (Objects.nonNull(data)) {
                        uploadRealTimeMonitoringDataService.updateById(data.getId());
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount()));
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree()));
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }else {
                        log.info("首次上传实时监测数据");
                        uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
                        uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
                        uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                    }
                    uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType());
                    uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId());
                    uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus());
//                    uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime());
//                    uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime());
                    int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                    if(i == 0){
                        log.error("数据存储mongo失败");
                    }
                    // 业务处理
                    UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                    BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                    chargingOrderClient.chargeMonitoring(query);
                    // 订单id
                    ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage();
                    chargingOrderMessage3.setOrderNumber(chargingOrder.getCode());
                    // 推送充电订单信息
                    ChargingMessage chargingMessage4 = new ChargingMessage();
                    chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS);
                    chargingMessage4.setOrderMessage(chargingOrderMessage3);
                    enhanceProduce.orderInfoMessage(chargingMessage4);
//                    ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//                    threadPoolExecutor2.execute(new Runnable() {
//                        @Override
//                        public void run() {
//                            chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+"");
//                break;
//            case SendTagConstant.MOTOR_ABORT:
//                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
//                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
//                // 持久化消息
//                MotorAbort motorAbort = new MotorAbort();
//                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
//                motorAbortService.create(motorAbort);
//                // 业务处理
//                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
//                break;
//            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
//                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
//                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
//                // 持久化消息
//                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
//                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
//                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
//
//                            tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder);
//                        }
//                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
                // 持久化消息
                ChargingHandshake chargingHandshake = new ChargingHandshake();
                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
                chargingHandshakeService.create(chargingHandshake);
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
                log.info("业务消息处理:{}",parameterSettingMessage);
                // 持久化消息
                ParameterSetting parameterSetting = new ParameterSetting();
                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
                parameterSettingService.create(parameterSetting);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
                // 持久化消息
                BmsAbort bmsAbort = new BmsAbort();
                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                bmsAbortService.create(bmsAbort);
                ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                threadPoolExecutor3.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 业务处理
                        chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                    }
                });
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
                // 持久化消息
                MotorAbort motorAbort = new MotorAbort();
                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                motorAbortService.create(motorAbort);
                // 业务处理
                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
                // 持久化消息
                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
                // 业务处理
                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderBms)){
                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
                }
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
                // 持久化消息
                BmsInformation bmsInformation = new BmsInformation();
                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
                bmsInformationService.create(bmsInformation);
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
                // 持久化消息
                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
                chargingPileStartsChargingService.create(chargingPileStartsCharging);
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
                // 持久化消息
                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                platformStartChargingReplyService.create(platformStartChargingReply);
                // 业务处理
                PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                chargingOrderClient.startChargeSuccessfully(message1);
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
                // 持久化消息
                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                platformStopChargingReplyService.create(platformStopChargingReply);
                PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
                BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
                chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
                // 持久化消息
                TransactionRecord transactionRecord = new TransactionRecord();
                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                transactionRecordService.create(transactionRecord);
                // 业务处理
                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderRecord)){
                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                }
                //计算费用
                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
                BeanUtils.copyProperties(transactionRecordMessage,vo);
                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
                if(200 != code){
                    //失败后添加到队列中继续处理数据
                    redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
                }
                // 添加实时上传记录结束记录
                // 查询mogondb上一条数据
                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
                if(Objects.nonNull(data) && data.getStatus() != 5){
                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
                    uploadRealTimeMonitoringData.setStatus(5);
                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                }
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
                // 持久化消息
                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
                updateBalanceReplyService.create(updateBalanceReply);
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
                // 持久化消息
                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
                // 持久化消息
                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
                clearOfflineCardReplyService.create(clearOfflineCardReply);
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
                // 持久化消息
                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
                workingParameterSettingReplyService.create(workingParameterSettingReply);
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
                // 持久化消息
                TimingSetting timingSetting = new TimingSetting();
                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
                timingSettingService.create(timingSetting);
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
                // 持久化消息
                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
                setupBillingModelReplyService.create(setupBillingModelReply);
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
                // 持久化消息
                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
                groundLockRealTimeDataService.create(groundLockRealTimeData);
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
                // 持久化消息
                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
                // 持久化消息
                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
                platformRestartReplyService.create(platformRestartReply);
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
                SecurityDetection securityDetection = new SecurityDetection();
                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                securityDetectionService.create(securityDetection);
                SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
                BeanUtils.copyProperties(securityDetection, securityDetection1);
                chargingOrderClient.securityDetection(securityDetection1);
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
                break;
        }
    }
}
//                // 业务处理
//                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
//                if(Objects.nonNull(chargingOrderBms)){
//                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
//                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
//                }
//                break;
//            case SendTagConstant.BMS_INFORMATION:
//                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
//                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
//                // 持久化消息
//                BmsInformation bmsInformation = new BmsInformation();
//                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
//                bmsInformationService.create(bmsInformation);
//                break;
//            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
//                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
//                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
//                // 持久化消息
//                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
//                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
//                chargingPileStartsChargingService.create(chargingPileStartsCharging);
//                break;
//            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
//                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
//                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
//                // 持久化消息
//                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
//                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
//                platformStartChargingReplyService.create(platformStartChargingReply);
//
//                // 业务处理
//                PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO();
//                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
//                chargingOrderClient.startChargeSuccessfully(message1);
//                break;
//            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
//                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
//                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
//                // 持久化消息
//                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
//                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
//                platformStopChargingReplyService.create(platformStopChargingReply);
//
//                PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO();
//                BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1);
//                chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1);
//                break;
//            case SendTagConstant.TRANSACTION_RECORD:
//                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
//                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
//                transactionRecordMessage.setResult(JSONObject.toJSONString(message));
//                // 持久化消息
//                TransactionRecord transactionRecord = new TransactionRecord();
//                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
//                transactionRecordService.create(transactionRecord);
//
//                // 业务处理
//                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
//                if(Objects.nonNull(chargingOrderRecord)){
//                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
//                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
//                }
//                //计算费用
//                TransactionRecordMessageVO vo = new TransactionRecordMessageVO();
//                BeanUtils.copyProperties(transactionRecordMessage,vo);
//                int code = chargingOrderClient.endChargeBillingCharge(vo).getCode();
//                if(200 != code){
//                    //失败后添加到队列中继续处理数据
//                    redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number());
//                }
//
//
//                // 添加实时上传记录结束记录
//                // 查询mogondb上一条数据
//                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number());
//                if(Objects.nonNull(data) && data.getStatus() != 5){
//                    UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
//                    BeanUtils.copyProperties(data,uploadRealTimeMonitoringData);
//                    uploadRealTimeMonitoringData.setStatus(5);
//                    uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
//                }
//                break;
//            case SendTagConstant.UPDATE_BALANCE_REPLY:
//                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
//                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
//                // 持久化消息
//                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
//                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
//                updateBalanceReplyService.create(updateBalanceReply);
//                break;
//            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
//                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
//                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
//                // 持久化消息
//                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
//                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
//                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
//                break;
//            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
//                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
//                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
//                // 持久化消息
//                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
//                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
//                clearOfflineCardReplyService.create(clearOfflineCardReply);
//                break;
//            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
//                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
//                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
//                // 持久化消息
//                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
//                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
//                workingParameterSettingReplyService.create(workingParameterSettingReply);
//                break;
//            case SendTagConstant.TIMING_SETTING:
//                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
//                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
//                // 持久化消息
//                TimingSetting timingSetting = new TimingSetting();
//                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
//                timingSettingService.create(timingSetting);
//                break;
//            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
//                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
//                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
//                // 持久化消息
//                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
//                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
//                setupBillingModelReplyService.create(setupBillingModelReply);
//                break;
//            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
//                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
//                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
//                // 持久化消息
//                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
//                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
//                groundLockRealTimeDataService.create(groundLockRealTimeData);
//                break;
//            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
//                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
//                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
//                // 持久化消息
//                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
//                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
//                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
//                break;
//            case SendTagConstant.PLATFORM_RESTART_REPLY:
//                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
//                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
//                // 持久化消息
//                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
//                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
//                platformRestartReplyService.create(platformRestartReply);
//                break;
//            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
//                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
//                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
//                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
//                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
//                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
//                break;
//            case SendTagConstant.SECURITY_DETECTION:
//                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
//                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
//                SecurityDetection securityDetection = new SecurityDetection();
//                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
//                securityDetectionService.create(securityDetection);
//
//                SecurityDetectionVO securityDetection1 = new SecurityDetectionVO();
//                BeanUtils.copyProperties(securityDetection, securityDetection1);
//                chargingOrderClient.securityDetection(securityDetection1);
//                break;
//            default:
//                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
//                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
//                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
//                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
//                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
//                break;
//        }
//    }
//
//
//}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java
@@ -64,7 +64,7 @@
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java
@@ -30,10 +30,12 @@
import com.ruoyi.other.api.domain.Operator;
import com.ruoyi.other.api.feignClient.OperatorClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cglib.core.Local;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
@@ -50,395 +52,5 @@
@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RuoYiIntegrationApplication.class)
public class RuoYiIntegrationApplicationTests {
    @Resource
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private ParkingOrderService parkingOrderService;
    @Resource
    private SiteClient siteClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Resource
    private OperatorClient operatorClient;
    @Test
    public void test(){
        /**
         * 推送充电设备接口状态信息
         * @param chargingGun
         * @return
         */
        List<TChargingGun> data = chargingGunClient.getAllGun().getData();
        TChargingGun chargingGun = data.stream().filter(e -> e.getId() == 61).findFirst().orElse(new TChargingGun());
        ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo();
            connectorStatusInfo.setOperatorID("906171535");
            connectorStatusInfo.setEquipmentOwnerID("906171535");
            connectorStatusInfo.setStationID(String.valueOf(chargingGun.getSiteId()));
            connectorStatusInfo.setEquipmentID(String.valueOf(chargingGun.getChargingPileId()));
            connectorStatusInfo.setConnectorID(chargingGun.getFullNumber());
            connectorStatusInfo.setEquipmentClassification(1);
            switch (chargingGun.getStatus()){
                case 1:
                    connectorStatusInfo.setStatus(0);
                    break;
                case 2:
                    connectorStatusInfo.setStatus(1);
                    break;
                case 3:
                    connectorStatusInfo.setStatus(2);
                    break;
                case 4:
                    connectorStatusInfo.setStatus(3);
                    break;
                case 5:
                    connectorStatusInfo.setStatus(3);
                    break;
                case 6:
                    connectorStatusInfo.setStatus(4);
                    break;
                case 7:
                    connectorStatusInfo.setStatus(255);
                    break;
            }
            connectorStatusInfo.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//            List<Operator> operators = operatorClient.getAllOperator().getData();
//            for (Operator operator : operators) {
                tcecSuperviseUtil.notificationStationStatus(new Operator(), connectorStatusInfo);
//            }
    }
    @Autowired
    private TCECSuperviseUtil tcecSuperviseUtil;
    private final static String operatorId = "906171535";
    @Test
    public void test1(){
        TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData();
        SupEquipChargeStatus supEquipChargeStatus = new SupEquipChargeStatus();
        supEquipChargeStatus.setOperatorID(operatorId);
        supEquipChargeStatus.setEquipmentOwnerID(operatorId);
        supEquipChargeStatus.setStationID(String.valueOf(chargingOrder.getSiteId()));
        supEquipChargeStatus.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId()));
        supEquipChargeStatus.setOrderNo(operatorId+chargingOrder.getCode());
        switch (chargingOrder.getStatus()){
            case 2:
                supEquipChargeStatus.setConnectorStatus(1);
                break;
            case 3:
                supEquipChargeStatus.setConnectorStatus(2);
                break;
            case 4:
                supEquipChargeStatus.setConnectorStatus(3);
                break;
            case 5:
                supEquipChargeStatus.setConnectorStatus(4);
                break;
        }
        TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData();
        supEquipChargeStatus.setConnectorID(chargingGun.getFullNumber());
        supEquipChargeStatus.setEquipmentClassification(1);
        supEquipChargeStatus.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        switch (chargingGun.getStatus()){
            case 1:
                supEquipChargeStatus.setConnectorStatus(0);
                break;
            case 2:
                supEquipChargeStatus.setConnectorStatus(1);
                break;
            case 3:
                supEquipChargeStatus.setConnectorStatus(2);
                break;
            case 4:
                supEquipChargeStatus.setConnectorStatus(3);
                break;
            case 5:
                supEquipChargeStatus.setConnectorStatus(3);
                break;
            case 6:
                supEquipChargeStatus.setConnectorStatus(4);
                break;
            case 7:
                supEquipChargeStatus.setConnectorStatus(255);
                break;
        }
        supEquipChargeStatus.setCurrentA(chargingOrder.getCurrent());
        supEquipChargeStatus.setSOC(StringUtils.hasLength(chargingOrder.getEndSoc())?new BigDecimal(chargingOrder.getEndSoc()):new BigDecimal("1"));
        supEquipChargeStatus.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
        supEquipChargeStatus.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        supEquipChargeStatus.setTotalPower(chargingOrder.getElectrovalence());
        tcecSuperviseUtil.notificationSupEquipChargeStatus(new Operator(), supEquipChargeStatus);
    }
    @Test
    public void test2(){
        TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData();
        SupChargeOrderInfo supChargeOrderInfo = new SupChargeOrderInfo();
        supChargeOrderInfo.setOperatorID(operatorId);
        supChargeOrderInfo.setEquipmentOwnerID(operatorId);
        supChargeOrderInfo.setStationID(String.valueOf(chargingOrder.getSiteId()));
        supChargeOrderInfo.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId()));
        supChargeOrderInfo.setOrderNo(operatorId+chargingOrder.getCode());
        TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData();
        supChargeOrderInfo.setConnectorID(chargingGun.getFullNumber());
        supChargeOrderInfo.setEquipmentClassification(1);
        supChargeOrderInfo.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        supChargeOrderInfo.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
        supChargeOrderInfo.setEndTime(chargingOrder.getEndTime() != null ? chargingOrder.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
        supChargeOrderInfo.setTotalPower(chargingOrder.getElectrovalence());
        supChargeOrderInfo.setTotalElecMoney(chargingOrder.getElectrovalence());
        supChargeOrderInfo.setTotalServiceMoney(chargingOrder.getServiceCharge());
        supChargeOrderInfo.setTotalMoney(chargingOrder.getOrderAmount());
        supChargeOrderInfo.setOrderStatus(chargingOrder.getStatus());
        switch (chargingOrder.getEndMode()){
            case 0:
                supChargeOrderInfo.setStopReason(5);
                supChargeOrderInfo.setStopDesc("异常终止");
                break;
            case 1:
                supChargeOrderInfo.setStopReason(0);
                supChargeOrderInfo.setStopDesc("用户手动停止充电");
                break;
            case 2:
                supChargeOrderInfo.setStopReason(1);
                supChargeOrderInfo.setStopDesc("客户归属地运营商平台停止充电");
                break;
            case 3:
                supChargeOrderInfo.setStopReason(1);
                supChargeOrderInfo.setStopDesc("费用不足中止");
                break;
        }
        tcecSuperviseUtil.notificationChargeOrderInfo(new Operator(), supChargeOrderInfo);
    }
    @Test
    public void test3(){
        StationStatsInfoResult res = new StationStatsInfoResult();
        List<Site> data = siteClient.getSiteAll().getData();
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime startLocalDateTime = now.minusDays(1);
        LocalDateTime endLocalDateTime = now.minusDays(1);
        LocalDateTime localDateTime1 = LocalDateTime.of(2025, 2, 3, 0, 0, 0);
        LocalDateTime localDateTime2 = LocalDateTime.of(2025, 2, 3, 23, 59, 59);
        // 获取今天凌晨
        startLocalDateTime.withHour(0);
        startLocalDateTime.withMinute(0);
        startLocalDateTime.withSecond(0);
        startLocalDateTime.withMonth(1);
        startLocalDateTime.withDayOfMonth(28);
        String start = DateUtils.localDateTimeToString(startLocalDateTime);
        endLocalDateTime.withHour(23);
        endLocalDateTime.withMinute(59);
        endLocalDateTime.withSecond(59);
        startLocalDateTime.withMonth(1);
        startLocalDateTime.withDayOfMonth(28);
        String end = DateUtils.localDateTimeToString(endLocalDateTime);
        ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO();
        chargingStatisticeDTO.setStartTime(localDateTime1);
        chargingStatisticeDTO.setEndTime(localDateTime2);
        List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData();
        List<StationStatsInfo> stationStatsInfos = new ArrayList<>();
        String start1 = DateUtils.localDateTimeToString(localDateTime1);
        String start2 = DateUtils.localDateTimeToString(localDateTime2);
        for (Site datum : data) {
            StationStatsInfo stationStatsInfo = new StationStatsInfo();
            stationStatsInfo.setStationID(datum.getId().toString());
            stationStatsInfo.setEquipmentOwnerID("906171535");
            stationStatsInfo.setOperatorID("906171535");
            stationStatsInfo.setStationClassification(1);
            stationStatsInfo.setStartTime(start1);
            stationStatsInfo.setEndTime(start2);
            List<TChargingOrder> chargingOrders = data1.stream().filter(e -> e.getSiteId().equals(datum.getId())).collect(Collectors.toList());
            // 充电电量
            BigDecimal electricity = new BigDecimal("0");
            int chargingCount = 0;
            for (TChargingOrder chargingOrder : chargingOrders) {
                if (chargingOrder.getElectricity()!=null){
                    electricity = electricity.add(chargingOrder.getElectricity());
                    chargingCount++;
                }
            }
            stationStatsInfo.setStationElectricity(electricity.divide(new BigDecimal("24"),4, BigDecimal.ROUND_DOWN));
            stationStatsInfo.setStationTotalChargeEnergy(electricity.setScale(4, BigDecimal.ROUND_DOWN));
            stationStatsInfo.setStationTotalWarningNum(0);
            stationStatsInfo.setStationTotalOtherEnergy(new BigDecimal("0"));
            stationStatsInfo.setStationTotalChargeNum(chargingCount);
            //构建设备统计数据
            List<EquipmentStatsInfo> EquipmentStatsInfos = new ArrayList<>();
            Map<Integer, List<TChargingOrder>> collect = chargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingPileId));
            for (Integer integer : collect.keySet()) {
                List<TChargingOrder> tChargingOrders = collect.get(integer);
                BigDecimal reduce1 = tChargingOrders.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add);
                EquipmentStatsInfo equipmentStatsInfo = new EquipmentStatsInfo();
                equipmentStatsInfo.setEquipmentClassification(1);
                long chargingTime = 0L;
                for (TChargingOrder tChargingOrder : tChargingOrders) {
                    // 累加充电时长
                    LocalDateTime startTime = tChargingOrder.getStartTime();
                    LocalDateTime endTime = tChargingOrder.getEndTime();
                    // 计算时间差 单位分钟
                    chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60;
                }
                equipmentStatsInfo.setEquipmentTotalChargeTime(chargingTime);
                equipmentStatsInfo.setEquipmentTotalChargeNum(tChargingOrders.size());
                equipmentStatsInfo.setEquipmentTotalWarningNum(0);
                equipmentStatsInfo.setEquipmentID(integer.toString());
                equipmentStatsInfo.setEquipmentElectricity(reduce1);
                //构建设备接口统计数据
                Map<Integer, List<TChargingOrder>> collect2 = tChargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingGunId));
                List<ConnectorStatsInfo> ConnectorStatsInfos = new ArrayList<>();
                for (Integer integer1 : collect2.keySet()) {
                    List<TChargingOrder> tChargingOrders1 = collect2.get(integer1);
                    BigDecimal reduce2 = tChargingOrders1.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add);
                    long chargingTime1 = 0L;
                    for (TChargingOrder chargingOrder : tChargingOrders1) {
                        // 累加充电时长
                        LocalDateTime startTime = chargingOrder.getStartTime();
                        LocalDateTime endTime = chargingOrder.getEndTime();
                        // 计算时间差 单位分钟
                        chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60;
                    }
                    TChargingGun chargingGun = chargingGunClient.getChargingGunById(integer1).getData();
                    ConnectorStatsInfo connectorStatsInfo = new ConnectorStatsInfo();
                    connectorStatsInfo.setConnectorID(chargingGun.getFullNumber());
                    connectorStatsInfo.setConnectorElectricity(reduce2);
                    connectorStatsInfo.setConnectorTotalChargeTime(Integer.valueOf(chargingTime+""));
                    connectorStatsInfo.setConnectorTotalChargeNum(tChargingOrders1.size());
                    connectorStatsInfo.setConnectorTotalWarningNum(0);
                    ConnectorStatsInfos.add(connectorStatsInfo);
                }
                equipmentStatsInfo.setConnectorStatsInfos(ConnectorStatsInfos);
                EquipmentStatsInfos.add(equipmentStatsInfo);
            }
            stationStatsInfo.setEquipmentStatsInfos(EquipmentStatsInfos);
            stationStatsInfos.add(stationStatsInfo);
        }
        res.setStationStatsInfos(stationStatsInfos);
        tcecSuperviseUtil.superviseNotificationOperationStatsInfo(res);
    }
    @Resource
    private ChargingPileClient chargingPileClient;
    @Test
    public void test4(){
        List<Site> data = siteClient.getSiteAll().getData();
        List<Integer> siteIds = data.stream().map(Site::getId).collect(Collectors.toList());
        List<TChargingPile> tChargingPiles = chargingPileClient.getChargingPileBySiteIds(siteIds).getData();
        List<Integer> collect1 = tChargingPiles.stream().map(TChargingPile::getId).collect(Collectors.toList());
        List<TChargingGun> chargingGunList = chargingGunClient.getChargingGunByChargingPileIds(collect1).getData();
        SupStationPowerInfoResult supStationPowerInfoResult = new SupStationPowerInfoResult();
        List<SupStationPowerInfo> stationStatsInfos = new ArrayList<>();
        LocalDateTime now = LocalDateTime.of(2025, 2, 3, 23, 30, 0);
        LocalDateTime startLocalDateTime = LocalDateTime.of(2025, 2, 3, 20, 0, 0);
        ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO();
        chargingStatisticeDTO.setStartTime(startLocalDateTime);
        chargingStatisticeDTO.setEndTime(now);
        List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData();
        for (Site datum : data) {
            List<TChargingOrder> collect = data1.stream().filter(e -> e.getSiteId().equals(datum.getId())
                    &&e.getChargingPower()!=null).collect(Collectors.toList());
            SupStationPowerInfo supStationPowerInfo = new SupStationPowerInfo();
            supStationPowerInfo.setOperatorID("906171535");
            supStationPowerInfo.setEquipmentOwnerID("906171535");
            supStationPowerInfo.setStationID(datum.getId().toString());
            supStationPowerInfo.setStationClassification(1);
            supStationPowerInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now()));
            if (collect.isEmpty()){
                supStationPowerInfo.setStationRealTimePower(new BigDecimal("0"));
            }else{
                BigDecimal divide = collect.stream().map(TChargingOrder::getChargingPower).reduce(BigDecimal.ZERO,BigDecimal::add).divide(new BigDecimal(collect.size()),4,BigDecimal.ROUND_DOWN);
                supStationPowerInfo.setStationRealTimePower(divide);
            }
            supStationPowerInfo.setEquipmentPowerInfos(buildEquipmentPowerInfo(datum.getId(), tChargingPiles, chargingGunList));
            stationStatsInfos.add(supStationPowerInfo);
        }
        supStationPowerInfoResult.setSupStationPowerInfos(stationStatsInfos);
        tcecSuperviseUtil.superviseNotificationRealtimePowerInfo(supStationPowerInfoResult);
    }
    /**
     * 构建桩数据
     * @param tChargingPiles
     * @return
     */
    public List<SupEquipmentPowerInfo> buildEquipmentPowerInfo(Integer siteId, List<TChargingPile> tChargingPiles, List<TChargingGun> chargingGunList){
        List<SupEquipmentPowerInfo> equipmentInfos = new ArrayList<>();
        List<TChargingPile> collect = tChargingPiles.stream().filter(s -> s.getSiteId().equals(siteId)).collect(Collectors.toList());
        for (TChargingPile tChargingPile : collect) {
            SupEquipmentPowerInfo equipmentInfo = new SupEquipmentPowerInfo();
            equipmentInfo.setEquipmentID(tChargingPile.getId().toString());
            equipmentInfo.setEquipmentClassification(1);
            equipmentInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now()));
            equipmentInfo.setEquipRealTimePower(tChargingPile.getRatedPower());
            //构建设备接口信息
            equipmentInfo.setConnectorPowerInfos(buildConnectorPowerInfos(tChargingPile.getId(), tChargingPile.getCode(), chargingGunList));
            equipmentInfos.add(equipmentInfo);
        }
        return equipmentInfos;
    }
    public List<SupConnectorPowerInfo> buildConnectorPowerInfos(Integer chargingPileId, String code, List<TChargingGun> chargingGunList){
        List<SupConnectorPowerInfo> connectorInfos = new ArrayList<>();
        List<TChargingGun> collect = chargingGunList.stream().filter(s -> s.getChargingPileId().equals(chargingPileId)).collect(Collectors.toList());
        for (TChargingGun chargingGun : collect) {
            SupConnectorPowerInfo connectorInfo = new SupConnectorPowerInfo();
            connectorInfo.setConnectorID(chargingGun.getFullNumber());
            connectorInfo.setEquipmentClassification(chargingGun.getEquipmentClassification());
            connectorInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now()));
            connectorInfo.setConnectorRealTimePower(chargingGun.getChargingPower());
            connectorInfos.add(connectorInfo);
        }
        return connectorInfos;
    }
    private final static String query_token = "/query_token";
    private static final String OperatorID = "MA01H3BQ2";
    private static final String OperatorSecret = "f1331ef0b37c2d1b";
    private static final String SigSecret = "a6fedf0e1b27d6f7";
    private static final String DataSecret = "50a61b93919c9604";
    private static final String DataSecretIV = "7c8ac6861661d584";
    private final static String url = "https://dev-gov-hlht-sc.unievbj.com/evcs/v1.0.0";
    @Test
    public void test5(){
        HttpRequest post = HttpUtil.createPost(url + query_token);
        JSONObject info = new JSONObject();
        info.put("OperatorID", "906171535");
        info.put("OperatorSecret", OperatorSecret);
        Long timeStamp = Long.valueOf(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
        post.contentType("application/json;charset=utf-8");
        BaseRequestJianGuan baseRequest = new BaseRequestJianGuan();
        baseRequest.setOperatorID("906171535");
        baseRequest.setTimeStamp(timeStamp+"");
        baseRequest.setSeq("0001");
        String jsonString = JacksonUtils.toJson(info);
        SequenceGenerator generator = new SequenceGenerator();
        String nextSequence = generator.getNextSequence();
        String data = AesEncryption.encrypt(DataSecret, DataSecretIV,jsonString);
        String hmacMD5 = HMacMD5Util.getHMacMD5("906171535",timeStamp+"", data,nextSequence,SigSecret);
        baseRequest.setData(data);
        baseRequest.setSig(hmacMD5);
        String request_json = JacksonUtils.toJson(baseRequest);
        log.info("获取三方平台授权token请求地址:" + post.getUrl());
        log.info("获取三方平台授权token请求参数:" + request_json);
        log.info("获取三方平台授权token请求Data:" + jsonString);
        post.body(request_json);
        HttpResponse execute = post.execute();
        if(200 != execute.getStatus()){
            log.error("获取三方平台授权token失败:" + execute.body());
        }
        log.info("获取三方平台授权token响应参数:" + execute.body());
        BaseResult baseResult = com.alibaba.fastjson.JSON.parseObject(execute.body(), BaseResult.class);
        Integer Ret = baseResult.getRet();
        if(0 != Ret){
            log.error("获取三方平台授权token失败:" + baseResult.getMsg());
        }
        //解密参数
        String decrypt = AESUtil.decrypt(baseResult.getData(), DataSecret, DataSecretIV);
        log.info("获取三方平台授权token响应Data:" + decrypt);
        QueryTokenResult queryTokenResult = JSON.parseObject(decrypt, QueryTokenResult.class);
        String token = queryTokenResult.getAccessToken();
//        Long tokenAvailableTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) + queryTokenResult.getTokenAvailab
        System.err.println(token);
    }
}
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java
@@ -2332,6 +2332,15 @@
        return chargingOrderService.getNotPaymentChargingOrder();
    }
    
    /**
     * 手动推送订单给三方平台
     * @param code
     * @return
     */
    @PostMapping("/pushOrderInfo")
    public R pushOrderInfo(@RequestParam String code){
        return chargingOrderService.pushOrderInfo(code);
    }
    
}
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java
@@ -310,4 +310,13 @@
     * @return
     */
    R getNotPaymentChargingOrder();
    /**
     *
     * 手动推送订单给第三方平台
     * @param code
     * @return
     */
    R pushOrderInfo(String code);
}
ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
@@ -697,15 +697,15 @@
        
        log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString());
        sendMessageClient.platformStartCharging(platformStartCharging);
        //异步线程检测远程启动的应答结果。如果失败,则需要全额退款
        Long id = chargingOrder.getId();
        //执行5分钟的定时任务检测
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(()->{
            if(timingDetection(id)){
                scheduler.shutdown();
            }
        }, 5, 1, TimeUnit.SECONDS);
//        //异步线程检测远程启动的应答结果。如果失败,则需要全额退款
//        Long id = chargingOrder.getId();
//        //执行5分钟的定时任务检测
//        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
//        scheduler.scheduleAtFixedRate(()->{
//            if(timingDetection(id)){
//                scheduler.shutdown();
//            }
//        }, 5, 1, TimeUnit.SECONDS);
        return AjaxResult.success();
    }
    
@@ -3890,15 +3890,15 @@
        platformStartCharging.setAccount_balance(rechargeAmount);
        log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString());
        sendMessageClient.platformStartCharging(platformStartCharging);
        //异步线程检测远程启动的应答结果。如果失败,则需要全额退款
        Long id = chargingOrder.getId();
        //执行5分钟的定时任务检测
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(()->{
            if(timingDetection(id)){
                scheduler.shutdown();
            }
        }, 5, 1, TimeUnit.SECONDS);
//        //异步线程检测远程启动的应答结果。如果失败,则需要全额退款
//        Long id = chargingOrder.getId();
//        //执行5分钟的定时任务检测
//        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
//        scheduler.scheduleAtFixedRate(()->{
//            if(timingDetection(id)){
//                scheduler.shutdown();
//            }
//        }, 5, 1, TimeUnit.SECONDS);
        
        //推送三方平台订单状态
        tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId());
@@ -3967,4 +3967,21 @@
        }
        return R.ok(mapList);
    }
    /**
     *
     * 手动推送订单给第三方平台
     * @param code
     * @return
     */
    @Override
    public R pushOrderInfo(String code) {
        TChargingOrder chargingOrder = this.getOne(new LambdaQueryWrapper<TChargingOrder>().eq(TChargingOrder::getCode, code));
        tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId());
        tcecClient.notificationStopChargeResult(chargingOrder.getStartChargeSeq(), chargingOrder.getChargingGunId().toString(),
                chargingOrder.getOperatorId());
        tcecClient.notificationChargeOrderInfo(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId());
        return R.ok();
    }
}