Pu Zhibing
2024-10-09 d5d181b911b6de2935fe14249ef958344c3fbf40
Merge remote-tracking branch 'origin/master'
7个文件已修改
2个文件已添加
645 ■■■■■ 已修改文件
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java 481 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java
@@ -6,6 +6,10 @@
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@@ -20,6 +24,7 @@
@SpringBootApplication
@EnableScheduling//开启定时任务
@EnableTransactionManagement//开启事务
@EnableBinding({ Source.class, Sink.class })
public class RuoYiIntegrationApplication {
    public static void main(String[] args) {
        SpringApplication.run(RuoYiIntegrationApplication.class, args);
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java
@@ -126,4 +126,5 @@
     * 安全监测
     */
    public final static String SECURITY_DETECTION ="security_detection";
    public static final String CHARGING_MESSAGE ="charging_message";
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -68,12 +68,15 @@
        writer.write("服务id:"+service_id+"\n");
        writer.close();
        SendResult sendResult;
        ChargingMessage chargingMessage = new ChargingMessage();
        chargingMessage.setServiceId(service_id);
        // 设备消息下发
        String result;
        switch (service_id){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class);
                sendResult = enhanceProduce.onlineMessage(onlineMessage);
                chargingMessage.setOnlineMessage(onlineMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                // 业务处理 登录认证应答
                OnlineReply onlineReply = new OnlineReply();
@@ -89,7 +92,8 @@
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class);
                sendResult = enhanceProduce.pingMessage(pingMessage);
                chargingMessage.setPingMessage(pingMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                Pong pong = new Pong();
                pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
@@ -100,17 +104,20 @@
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class);
                sendResult = enhanceProduce.endChargeMessage(endChargeMessage);
                chargingMessage.setEndChargeMessage(endChargeMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class);
                sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage);
                chargingMessage.setErrorMessageMessage(errorMessageMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class);
                sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage);
                chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply();
                if(billingModeVerifyMessage.getBilling_model_code().equals("0")){
@@ -137,7 +144,8 @@
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class);
                sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage);
                chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件   计费模型请求应答  1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段
                List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code().substring(0,14)).getData();
                Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails);
@@ -154,40 +162,48 @@
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class);
                sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
                chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class);
                sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage);
                chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class);
                sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage);
                chargingMessage.setParameterSettingMessage(parameterSettingMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class);
                sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage);
                chargingMessage.setBmsAbortMessage(bmsAbortMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class);
                sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage);
                chargingMessage.setMotorAbortMessage(motorAbortMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class);
                sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
                chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class);
                sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage);
                chargingMessage.setBmsInformationMessage(bmsInformationMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class);
                sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
                chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging();
                platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code());
@@ -200,17 +216,20 @@
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class);
                sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage);
                chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class);
                sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage);
                chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class);
                sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage);
                chargingMessage.setTransactionRecordMessage(transactionRecordMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord();
                confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number());
@@ -219,27 +238,32 @@
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class);
                sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage);
                chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class);
                sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
                chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class);
                sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
                chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class);
                sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
                chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class);
                sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage);
                chargingMessage.setTimingSettingMessage(timingSettingMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件 对时设置应答
                TimingSettingReply timingSettingReply = new TimingSettingReply();
                timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code());
@@ -248,37 +272,44 @@
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class);
                sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage);
                chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class);
                sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
                chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class);
                sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
                chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class);
                sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage);
                chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class);
                sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
                chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class);
                sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage);
                chargingMessage.setSecurityDetectionMessage(securityDetectionMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class);
                sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
                chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage);
                sendResult = enhanceProduce.chargingMessage(chargingMessage);
                // 响应硬件
                break;
        }
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java
@@ -27,4 +27,9 @@
     * 重试次数,用于判断重试次数,超过重试次数发送异常警告
     */
    protected Integer retryTimes = 0;
    /**
     * 服务id
     */
    protected String serviceId;
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java
@@ -35,11 +35,6 @@
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private MessageUtil messageUtil;
    @Autowired
    private IotMessageProduce iotMessageProduce;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java
@@ -33,10 +33,6 @@
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private IotMessageProduce iotMessageProduce;
    @Autowired
    private MessageUtil messageUtil;
    @Override
    protected void handleMessage(TimingSettingMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java
New file
@@ -0,0 +1,44 @@
package com.ruoyi.integration.rocket.model;
import com.ruoyi.integration.rocket.base.BaseMessage;
import lombok.Data;
import org.apache.poi.ss.formula.functions.T;
@Data
public class ChargingMessage extends BaseMessage {
    private AcquisitionBillingModeMessage acquisitionBillingModeMessage;
    private BillingModeVerifyMessage billingModeVerifyMessage;
    private BmsAbortMessage bmsAbortMessage;
    private BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage;
    private BmsInformationMessage bmsInformationMessage;
    private ChargingHandshakeMessage chargingHandshakeMessage;
    private ChargingMessage chargingMessage;
    private ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage;
    private ChargingPileStartsChargingMessage chargingPileStartsChargingMessage;
    private ClearOfflineCardReplyMessage clearOfflineCardReplyMessage;
    private EndChargeMessage endChargeMessage;
    private ErrorMessageMessage errorMessageMessage;
    private GroundLockRealTimeDataMessage groundLockRealTimeDataMessage;
    private MotorAbortMessage motorAbortMessage;
    private OnlineMessage onlineMessage;
    private ParameterSettingMessage parameterSettingMessage;
    private PingMessage pingMessage;
    private PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage;
    private PlatformRestartReplyMessage platformRestartReplyMessage;
    private PlatformStartChargingReplyMessage platformStartChargingReplyMessage;
    private PlatformStopChargingReplyMessage platformStopChargingReplyMessage;
    private QrCodeDeliveryMessage qrCodeDeliveryMessage;
    private QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage;
    private QueryOfflineCardReplyMessage queryOfflineCardReplyMessage;
    private SecurityDetectionMessage securityDetectionMessage;
    private SetupBillingModelReplyMessage setupBillingModelReplyMessage;
    private SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage;
    private TimingSettingMessage timingSettingMessage;
    private TimingSettingReplyMessage timingSettingReplyMessage;
    private TransactionRecordMessage transactionRecordMessage;
    private UpdateBalanceReplyMessage updateBalanceReplyMessage;
    private UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage;
    private WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage;
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
New file
@@ -0,0 +1,481 @@
package com.ruoyi.integration.rocket.produce;
import com.alibaba.fastjson.JSON;
import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient;
import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail;
import com.ruoyi.chargingPile.api.model.TChargingGun;
import com.ruoyi.chargingPile.api.model.TFaultMessage;
import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter;
import com.ruoyi.integration.mongodb.service.*;
import com.ruoyi.integration.rocket.model.*;
import com.ruoyi.integration.rocket.util.EnhanceMessageHandler;
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
import com.ruoyi.order.api.model.TChargingOrder;
import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Objects;
@Slf4j
@Component
@RocketMQMessageListener(
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "charge_charging_message",
        topic = "charge_charging_message",
        selectorExpression = "charging_message",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Resource
    private ChargingOrderClient chargingOrderClient;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private PingService pingService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Resource
    private AccountingStrategyDetailClient accountingStrategyDetailClient;
    @Resource
    private ChargingGunClient chargingGunClient;
    @Resource
    private FaultMessageClient faultMessageClient;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @StreamListener("input")
    @Override
    protected void handleMessage(ChargingMessage message) throws Exception {
        String serviceId = message.getServiceId();
        if(!StringUtils.hasLength(serviceId)){
            return;
        }
        switch (serviceId){
            case SendTagConstant.ONLINE:
                OnlineMessage onlineMessage = message.getOnlineMessage();
                log.info("充电桩登录认证业务消息处理:{}",onlineMessage);
                // 持久化消息
                Online online = new Online();
                BeanUtils.copyProperties(onlineMessage,online);
                onlineService.create(online);
                break;
            case SendTagConstant.PING:
                PingMessage pingMessage = message.getPingMessage();
                log.info("充电桩心跳包-业务消息处理:{}",pingMessage);
                // 持久化消息
                Ping ping = new Ping();
                BeanUtils.copyProperties(pingMessage,ping);
                pingService.create(ping);
                break;
            case SendTagConstant.END_CHARGE:
                EndChargeMessage endChargeMessage = message.getEndChargeMessage();
                log.info("充电结束-业务消息处理:{}",endChargeMessage);
                // 持久化消息
                EndCharge endCharge = new EndCharge();
                BeanUtils.copyProperties(endChargeMessage,endCharge);
                endChargeService.create(endCharge);
                // 业务处理
                chargingOrderClient.endCharge(endCharge.getTransaction_serial_number());
                break;
            case SendTagConstant.ERROR_MESSAGE:
                ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage();
                log.info("错误报文-业务消息处理:{}",errorMessageMessage1);
                // 持久化消息
                ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage();
                BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage);
                errorMessageMessageService.create(errorMessageMessage);
                break;
            case SendTagConstant.BILLING_MODE_VERIFY:
                BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage();
                log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage);
                // 持久化消息
                BillingModeVerify billingModeVerify = new BillingModeVerify();
                BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify);
                billingModeVerifyService.create(billingModeVerify);
                break;
            case SendTagConstant.ACQUISITION_BILLING_MODE:
                AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage();
                log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage);
                // 持久化消息
                AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode();
                BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode);
                acquisitionBillingModeService.create(acquisitionBillingMode);
                break;
            case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA:
                UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage();
                log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage);
                // 持久化消息
                UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData();
                BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData);
                // 查询mogondb上一条数据
                UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number());
                // 查询订单
                TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData();
                // 查询当前时间段的计费策略
                TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData();
                uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence());
                uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge());
                if (Objects.nonNull(data)) {
                    uploadRealTimeMonitoringData.setLast_time(data.getLast_time());
                    uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().divide(data.getPaid_amount()));
                    uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().divide(data.getCharging_degree()));
                    uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                }else {
                    log.info("首次上传实时监测数据");
                    uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount());
                    uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree());
                    uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP));
                }
                uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData);
                // 业务处理
                UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery();
                BeanUtils.copyProperties(uploadRealTimeMonitoringData, query);
                chargingOrderClient.chargeMonitoring(query);
                GetChargingGunByCode code = new GetChargingGunByCode();
                code.setCharging_pile_code(uploadRealTimeMonitoringDataMessage.getCharging_pile_code());
                code.setCharging_gun_code(uploadRealTimeMonitoringDataMessage.getCharging_gun_code());
                TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData();
                if(Objects.nonNull(chargingGun)){
                    // 存储状态信息
                    TFaultMessage faultMessage = new TFaultMessage();
                    if(uploadRealTimeMonitoringDataMessage.getCharging_gun_status().equals(0) || uploadRealTimeMonitoringDataMessage.getCharging_gun_status().equals(1)){
                        faultMessage.setSiteId(chargingGun.getSiteId());
                        faultMessage.setChargingPileId(chargingGun.getChargingPileId());
                        faultMessage.setChargingGunId(chargingGun.getId());
                        switch (uploadRealTimeMonitoringDataMessage.getCharging_gun_status()){
                            case 0:
                                faultMessage.setStatus(1);
                                chargingGun.setStatus(1);
                                break;
                            case 1:
                                faultMessage.setStatus(2);
                                chargingGun.setStatus(7);
                                break;
                        }
                        faultMessage.setDownTime(LocalDateTime.now());
                        faultMessageClient.createFaultMessage(faultMessage);
                    }else {
                        switch (uploadRealTimeMonitoringDataMessage.getCharging_gun_status()){
                            case 2:
                                chargingGun.setStatus(2);
                                break;
                            case 3:
                                chargingGun.setStatus(4);
                                break;
                        }
                        // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录
                        faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData();
                        if(Objects.nonNull(faultMessage)){
                            faultMessage.setEndTime(LocalDateTime.now());
                            faultMessageClient.updateFaultMessage(faultMessage);
                        }
                    }
                    chargingGunClient.updateChargingGunById(chargingGun);
                }
                break;
            case SendTagConstant.CHARGING_HANDSHAKE:
                ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage();
                log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage);
                // 持久化消息
                ChargingHandshake chargingHandshake = new ChargingHandshake();
                BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake);
                chargingHandshakeService.create(chargingHandshake);
                break;
            case SendTagConstant.PARAMETER_SETTING:
                ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage();
                log.info("业务消息处理:{}",parameterSettingMessage);
                // 持久化消息
                ParameterSetting parameterSetting = new ParameterSetting();
                BeanUtils.copyProperties(parameterSettingMessage,parameterSetting);
                parameterSettingService.create(parameterSetting);
                break;
            case SendTagConstant.BMS_ABORT:
                BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage();
                log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage);
                // 持久化消息
                BmsAbort bmsAbort = new BmsAbort();
                BeanUtils.copyProperties(bmsAbortMessage,bmsAbort);
                bmsAbortService.create(bmsAbort);
                // 业务处理
                chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number());
                break;
            case SendTagConstant.MOTOR_ABORT:
                MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage();
                log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage);
                // 持久化消息
                MotorAbort motorAbort = new MotorAbort();
                BeanUtils.copyProperties(motorAbortMessage,motorAbort);
                motorAbortService.create(motorAbort);
                // 业务处理
                chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number());
                break;
            case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION:
                BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage();
                log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage);
                // 持久化消息
                BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation();
                BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation);
                bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation);
                // 业务处理
                TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderBms)){
                    chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements());
                    chargingOrderClient.updateChargingOrder(chargingOrderBms);
                }
                break;
            case SendTagConstant.BMS_INFORMATION:
                BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage();
                log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage);
                // 持久化消息
                BmsInformation bmsInformation = new BmsInformation();
                BeanUtils.copyProperties(bmsInformationMessage,bmsInformation);
                bmsInformationService.create(bmsInformation);
                break;
            case SendTagConstant.CHARGING_PILE_STARTS_CHARGING:
                ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage();
                log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage);
                // 持久化消息
                ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging();
                BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging);
                chargingPileStartsChargingService.create(chargingPileStartsCharging);
                break;
            case SendTagConstant.PLATFORM_START_CHARGING_REPLY:
                PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage();
                log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage);
                // 持久化消息
                PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply();
                BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply);
                platformStartChargingReplyService.create(platformStartChargingReply);
                // 业务处理
                com.ruoyi.order.api.vo.PlatformStartChargingReplyMessage message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessage();
                BeanUtils.copyProperties(platformStartChargingReplyMessage, message1);
                chargingOrderClient.startChargeSuccessfully(message1);
                break;
            case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY:
                PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage();
                log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage);
                // 持久化消息
                PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply();
                BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply);
                platformStopChargingReplyService.create(platformStopChargingReply);
                break;
            case SendTagConstant.TRANSACTION_RECORD:
                TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage();
                log.info("交易记录-业务消息处理:{}",transactionRecordMessage);
                // 持久化消息
                TransactionRecord transactionRecord = new TransactionRecord();
                BeanUtils.copyProperties(transactionRecordMessage,transactionRecord);
                transactionRecordService.create(transactionRecord);
                // 业务处理
                TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData();
                if(Objects.nonNull(chargingOrderRecord)){
                    chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity());
                    chargingOrderClient.updateChargingOrder(chargingOrderRecord);
                }
                break;
            case SendTagConstant.UPDATE_BALANCE_REPLY:
                UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage();
                log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage);
                // 持久化消息
                UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply();
                BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply);
                updateBalanceReplyService.create(updateBalanceReply);
                break;
            case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY:
                SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage();
                log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage);
                // 持久化消息
                SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply();
                BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply);
                synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply);
                break;
            case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY:
                ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage();
                log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage);
                // 持久化消息
                ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply();
                BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply);
                clearOfflineCardReplyService.create(clearOfflineCardReply);
                break;
            case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY:
                WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage();
                log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage);
                // 持久化消息
                WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply();
                BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply);
                workingParameterSettingReplyService.create(workingParameterSettingReply);
                break;
            case SendTagConstant.TIMING_SETTING:
                TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage();
                log.info("对时设置-业务消息处理:{}",timingSettingMessage);
                // 持久化消息
                TimingSetting timingSetting = new TimingSetting();
                BeanUtils.copyProperties(timingSettingMessage,timingSetting);
                timingSettingService.create(timingSetting);
                break;
            case SendTagConstant.SETUP_BILLING_MODEL_REPLY:
                SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage();
                log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage);
                // 持久化消息
                SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply();
                BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply);
                setupBillingModelReplyService.create(setupBillingModelReply);
                break;
            case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA:
                GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage();
                log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage);
                // 持久化消息
                GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData();
                BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData);
                groundLockRealTimeDataService.create(groundLockRealTimeData);
                break;
            case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA:
                ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage();
                log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage);
                // 持久化消息
                ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData();
                BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData);
                chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData);
                break;
            case SendTagConstant.PLATFORM_RESTART_REPLY:
                PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage();
                log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage);
                // 持久化消息
                PlatformRestartReply platformRestartReply = new PlatformRestartReply();
                BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply);
                platformRestartReplyService.create(platformRestartReply);
                break;
            case SendTagConstant.QR_CODE_DELIVERY_REPLY:
                QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage();
                log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage);
                QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply();
                BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply);
                qrCodeDeliveryReplyService.create(qrCodeDeliveryReply);
                break;
            case SendTagConstant.SECURITY_DETECTION:
                SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage();
                log.info("安全监测-业务消息处理:{}",securityDetectionMessage);
                SecurityDetection securityDetection = new SecurityDetection();
                BeanUtils.copyProperties(securityDetectionMessage,securityDetection);
                securityDetectionService.create(securityDetection);
                break;
            default:
                PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage();
                log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage);
                PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply();
                BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply);
                platformRemoteUpdateReplyService.create(platformRemoteUpdateReply);
                break;
        }
    }
    @Override
    protected void handleMaxRetriesExceeded(ChargingMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }
    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }
    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
    /**
     * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    @Override
    protected boolean filter(ChargingMessage message) {
        // 此处可做消息过滤
        return false;
    }
    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(ChargingMessage message) {
        super.dispatchMessage(message);
    }
}
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java
@@ -343,4 +343,15 @@
        return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.SECURITY_DETECTION, SendTagConstant.SECURITY_DETECTION, message);
    }
    /**
     * 充电桩登录认证
     */
    public SendResult chargingMessage(ChargingMessage message) {
        // 设置业务key
        message.setKey(UUID.randomUUID().toString());
        // 设置消息来源,便于查询
        message.setSource(SendTagConstant.CHARGING_MESSAGE);
        return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_MESSAGE, SendTagConstant.CHARGING_MESSAGE, message);
    }
}