From 0a3316920cca90f68a25509ae0c948b80a7daaaa Mon Sep 17 00:00:00 2001 From: xuhy <3313886187@qq.com> Date: 星期三, 09 十月 2024 09:11:34 +0800 Subject: [PATCH] 修改对列 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java | 5 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java | 1 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java | 4 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java | 11 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java | 5 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java | 44 ++++ ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java | 89 +++++-- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java | 481 +++++++++++++++++++++++++++++++++++++++++++ ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java | 5 9 files changed, 607 insertions(+), 38 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java index a05ffbf..a6e9f2b 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java @@ -6,6 +6,10 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.messaging.Source; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -20,6 +24,7 @@ @SpringBootApplication @EnableScheduling//开启定时任务 @EnableTransactionManagement//开启事务 +@EnableBinding({ Source.class, Sink.class }) public class RuoYiIntegrationApplication { public static void main(String[] args) { SpringApplication.run(RuoYiIntegrationApplication.class, args); diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java index 6d92d1a..9b263b9 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/constant/SendTagConstant.java @@ -126,4 +126,5 @@ * 安全监测 */ public final static String SECURITY_DETECTION ="security_detection"; + public static final String CHARGING_MESSAGE ="charging_message"; } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java index bbc01a0..6684de3 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java @@ -68,12 +68,15 @@ writer.write("服务id:"+service_id+"\n"); writer.close(); SendResult sendResult; + ChargingMessage chargingMessage = new ChargingMessage(); + chargingMessage.setServiceId(service_id); // 设备消息下发 String result; switch (service_id){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); - sendResult = enhanceProduce.onlineMessage(onlineMessage); + chargingMessage.setOnlineMessage(onlineMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 // 业务处理 登录认证应答 OnlineReply onlineReply = new OnlineReply(); @@ -89,7 +92,8 @@ break; case SendTagConstant.PING: PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); - sendResult = enhanceProduce.pingMessage(pingMessage); + chargingMessage.setPingMessage(pingMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); @@ -100,17 +104,20 @@ break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); - sendResult = enhanceProduce.endChargeMessage(endChargeMessage); + chargingMessage.setEndChargeMessage(endChargeMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); - sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage); + chargingMessage.setErrorMessageMessage(errorMessageMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); - sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage); + chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ @@ -137,7 +144,8 @@ break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); - sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage); + chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code().substring(0,14)).getData(); Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); @@ -154,40 +162,48 @@ break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); - sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); + chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); - sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage); + chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); - sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage); + chargingMessage.setParameterSettingMessage(parameterSettingMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); - sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage); + chargingMessage.setBmsAbortMessage(bmsAbortMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); - sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage); + chargingMessage.setMotorAbortMessage(motorAbortMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); - sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); + chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); - sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage); + chargingMessage.setBmsInformationMessage(bmsInformationMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); - sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage); + chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); @@ -200,17 +216,20 @@ break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); - sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage); + chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); - sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage); + chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); - sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage); + chargingMessage.setTransactionRecordMessage(transactionRecordMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); @@ -219,27 +238,32 @@ break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); - sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage); + chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); - sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); + chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); - sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage); + chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); - sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage); + chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); - sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage); + chargingMessage.setTimingSettingMessage(timingSettingMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReply = new TimingSettingReply(); timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); @@ -248,37 +272,44 @@ break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); - sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage); + chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); - sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage); + chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); - sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); + chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); - sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage); + chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); - sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); + chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); - sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage); + chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); - sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); + chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); + sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java index 6129be7..fa708e9 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/base/BaseMessage.java @@ -27,4 +27,9 @@ * 重试次数,用于判断重试次数,超过重试次数发送异常警告 */ protected Integer retryTimes = 0; + + /** + * 服务id + */ + protected String serviceId; } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java index 19a3046..bb8d506 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java @@ -35,11 +35,6 @@ @Autowired private EndChargeService endChargeService; - @Autowired - private MessageUtil messageUtil; - @Autowired - private IotMessageProduce iotMessageProduce; - @Resource private ChargingOrderClient chargingOrderClient; diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java index fc49297..635ed19 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java @@ -33,10 +33,6 @@ @Autowired private TimingSettingService timingSettingService; - @Autowired - private IotMessageProduce iotMessageProduce; - @Autowired - private MessageUtil messageUtil; @Override protected void handleMessage(TimingSettingMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java new file mode 100644 index 0000000..149cd59 --- /dev/null +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/model/ChargingMessage.java @@ -0,0 +1,44 @@ +package com.ruoyi.integration.rocket.model; + +import com.ruoyi.integration.rocket.base.BaseMessage; +import lombok.Data; +import org.apache.poi.ss.formula.functions.T; + +@Data +public class ChargingMessage extends BaseMessage { + + private AcquisitionBillingModeMessage acquisitionBillingModeMessage; + private BillingModeVerifyMessage billingModeVerifyMessage; + private BmsAbortMessage bmsAbortMessage; + private BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage; + private BmsInformationMessage bmsInformationMessage; + private ChargingHandshakeMessage chargingHandshakeMessage; + private ChargingMessage chargingMessage; + private ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage; + private ChargingPileStartsChargingMessage chargingPileStartsChargingMessage; + private ClearOfflineCardReplyMessage clearOfflineCardReplyMessage; + private EndChargeMessage endChargeMessage; + private ErrorMessageMessage errorMessageMessage; + private GroundLockRealTimeDataMessage groundLockRealTimeDataMessage; + private MotorAbortMessage motorAbortMessage; + private OnlineMessage onlineMessage; + private ParameterSettingMessage parameterSettingMessage; + private PingMessage pingMessage; + private PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage; + private PlatformRestartReplyMessage platformRestartReplyMessage; + private PlatformStartChargingReplyMessage platformStartChargingReplyMessage; + private PlatformStopChargingReplyMessage platformStopChargingReplyMessage; + private QrCodeDeliveryMessage qrCodeDeliveryMessage; + private QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage; + private QueryOfflineCardReplyMessage queryOfflineCardReplyMessage; + private SecurityDetectionMessage securityDetectionMessage; + private SetupBillingModelReplyMessage setupBillingModelReplyMessage; + private SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage; + private TimingSettingMessage timingSettingMessage; + private TimingSettingReplyMessage timingSettingReplyMessage; + private TransactionRecordMessage transactionRecordMessage; + private UpdateBalanceReplyMessage updateBalanceReplyMessage; + private UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage; + private WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage; + +} \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java new file mode 100644 index 0000000..25e0031 --- /dev/null +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java @@ -0,0 +1,481 @@ +package com.ruoyi.integration.rocket.produce; + +import com.alibaba.fastjson.JSON; +import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; +import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; +import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient; +import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +import com.ruoyi.chargingPile.api.model.TChargingGun; +import com.ruoyi.chargingPile.api.model.TFaultMessage; +import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode; +import com.ruoyi.integration.api.model.*; +import com.ruoyi.integration.iotda.constant.SendTagConstant; +import com.ruoyi.integration.iotda.enums.ServiceIdMenu; +import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; +import com.ruoyi.integration.mongodb.service.*; +import com.ruoyi.integration.rocket.model.*; +import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; +import com.ruoyi.order.api.feignClient.ChargingOrderClient; +import com.ruoyi.order.api.model.TChargingOrder; +import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.math.RoundingMode; +import java.time.LocalDateTime; +import java.util.Date; +import java.util.Objects; + +@Slf4j +@Component +@RocketMQMessageListener( + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_charging_message", + topic = "charge_charging_message", + selectorExpression = "charging_message", + consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 +) +public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { + + @Autowired + private AcquisitionBillingModeService acquisitionBillingModeService; + @Autowired + private BillingModeVerifyService billingModeVerifyService; + @Autowired + private BmsAbortService bmsAbortService; + @Resource + private ChargingOrderClient chargingOrderClient; + @Autowired + private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; + @Autowired + private OnlineService onlineService; + @Autowired + private PingService pingService; + @Autowired + private EndChargeService endChargeService; + @Autowired + private ErrorMessageMessageService errorMessageMessageService; + @Autowired + private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; + @Resource + private AccountingStrategyDetailClient accountingStrategyDetailClient; + @Resource + private ChargingGunClient chargingGunClient; + @Resource + private FaultMessageClient faultMessageClient; + @Autowired + private ChargingHandshakeService chargingHandshakeService; + @Autowired + private ParameterSettingService parameterSettingService; + @Autowired + private MotorAbortService motorAbortService; + @Autowired + private BmsInformationService bmsInformationService; + @Autowired + private ChargingPileStartsChargingService chargingPileStartsChargingService; + @Autowired + private PlatformStartChargingReplyService platformStartChargingReplyService; + @Autowired + private PlatformStopChargingReplyService platformStopChargingReplyService; + @Autowired + private TransactionRecordService transactionRecordService; + @Autowired + private UpdateBalanceReplyService updateBalanceReplyService; + @Autowired + private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; + @Autowired + private ClearOfflineCardReplyService clearOfflineCardReplyService; + @Autowired + private WorkingParameterSettingReplyService workingParameterSettingReplyService; + @Autowired + private TimingSettingService timingSettingService; + @Autowired + private SetupBillingModelReplyService setupBillingModelReplyService; + @Autowired + private GroundLockRealTimeDataService groundLockRealTimeDataService; + @Autowired + private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; + @Autowired + private PlatformRestartReplyService platformRestartReplyService; + @Autowired + private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; + @Autowired + private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; + @Autowired + private SecurityDetectionService securityDetectionService; + @StreamListener("input") + @Override + protected void handleMessage(ChargingMessage message) throws Exception { + String serviceId = message.getServiceId(); + if(!StringUtils.hasLength(serviceId)){ + return; + } + switch (serviceId){ + case SendTagConstant.ONLINE: + OnlineMessage onlineMessage = message.getOnlineMessage(); + log.info("充电桩登录认证业务消息处理:{}",onlineMessage); + // 持久化消息 + Online online = new Online(); + BeanUtils.copyProperties(onlineMessage,online); + onlineService.create(online); + break; + case SendTagConstant.PING: + PingMessage pingMessage = message.getPingMessage(); + log.info("充电桩心跳包-业务消息处理:{}",pingMessage); + // 持久化消息 + Ping ping = new Ping(); + BeanUtils.copyProperties(pingMessage,ping); + pingService.create(ping); + break; + case SendTagConstant.END_CHARGE: + EndChargeMessage endChargeMessage = message.getEndChargeMessage(); + log.info("充电结束-业务消息处理:{}",endChargeMessage); + // 持久化消息 + EndCharge endCharge = new EndCharge(); + BeanUtils.copyProperties(endChargeMessage,endCharge); + endChargeService.create(endCharge); + // 业务处理 + chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); + break; + case SendTagConstant.ERROR_MESSAGE: + ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); + log.info("错误报文-业务消息处理:{}",errorMessageMessage1); + // 持久化消息 + ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); + BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); + errorMessageMessageService.create(errorMessageMessage); + break; + case SendTagConstant.BILLING_MODE_VERIFY: + BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); + log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); + // 持久化消息 + BillingModeVerify billingModeVerify = new BillingModeVerify(); + BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); + billingModeVerifyService.create(billingModeVerify); + break; + case SendTagConstant.ACQUISITION_BILLING_MODE: + AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); + log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); + // 持久化消息 + AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); + BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); + acquisitionBillingModeService.create(acquisitionBillingMode); + break; + case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: + UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); + log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); + // 持久化消息 + UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); + BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); + // 查询mogondb上一条数据 + UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); + // 查询订单 + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); + // 查询当前时间段的计费策略 + TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); + uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); + uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); + if (Objects.nonNull(data)) { + uploadRealTimeMonitoringData.setLast_time(data.getLast_time()); + uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().divide(data.getPaid_amount())); + uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().divide(data.getCharging_degree())); + uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + }else { + log.info("首次上传实时监测数据"); + uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); + uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); + uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + } + uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); + // 业务处理 + UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); + BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); + chargingOrderClient.chargeMonitoring(query); + GetChargingGunByCode code = new GetChargingGunByCode(); + code.setCharging_pile_code(uploadRealTimeMonitoringDataMessage.getCharging_pile_code()); + code.setCharging_gun_code(uploadRealTimeMonitoringDataMessage.getCharging_gun_code()); + TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData(); + if(Objects.nonNull(chargingGun)){ + // 存储状态信息 + TFaultMessage faultMessage = new TFaultMessage(); + if(uploadRealTimeMonitoringDataMessage.getCharging_gun_status().equals(0) || uploadRealTimeMonitoringDataMessage.getCharging_gun_status().equals(1)){ + faultMessage.setSiteId(chargingGun.getSiteId()); + faultMessage.setChargingPileId(chargingGun.getChargingPileId()); + faultMessage.setChargingGunId(chargingGun.getId()); + switch (uploadRealTimeMonitoringDataMessage.getCharging_gun_status()){ + case 0: + faultMessage.setStatus(1); + chargingGun.setStatus(1); + break; + case 1: + faultMessage.setStatus(2); + chargingGun.setStatus(7); + break; + } + faultMessage.setDownTime(LocalDateTime.now()); + faultMessageClient.createFaultMessage(faultMessage); + }else { + switch (uploadRealTimeMonitoringDataMessage.getCharging_gun_status()){ + case 2: + chargingGun.setStatus(2); + break; + case 3: + chargingGun.setStatus(4); + break; + } + // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录 + faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData(); + if(Objects.nonNull(faultMessage)){ + faultMessage.setEndTime(LocalDateTime.now()); + faultMessageClient.updateFaultMessage(faultMessage); + } + } + chargingGunClient.updateChargingGunById(chargingGun); + } + break; + case SendTagConstant.CHARGING_HANDSHAKE: + ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); + log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); + // 持久化消息 + ChargingHandshake chargingHandshake = new ChargingHandshake(); + BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); + chargingHandshakeService.create(chargingHandshake); + break; + case SendTagConstant.PARAMETER_SETTING: + ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); + log.info("业务消息处理:{}",parameterSettingMessage); + // 持久化消息 + ParameterSetting parameterSetting = new ParameterSetting(); + BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); + parameterSettingService.create(parameterSetting); + break; + case SendTagConstant.BMS_ABORT: + BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); + log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); + // 持久化消息 + BmsAbort bmsAbort = new BmsAbort(); + BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); + bmsAbortService.create(bmsAbort); + // 业务处理 + chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); + break; + case SendTagConstant.MOTOR_ABORT: + MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); + log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); + // 持久化消息 + MotorAbort motorAbort = new MotorAbort(); + BeanUtils.copyProperties(motorAbortMessage,motorAbort); + motorAbortService.create(motorAbort); + // 业务处理 + chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); + break; + case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: + BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); + log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); + // 持久化消息 + BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); + BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); + bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); + // 业务处理 + TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderBms)){ + chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); + chargingOrderClient.updateChargingOrder(chargingOrderBms); + } + break; + case SendTagConstant.BMS_INFORMATION: + BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); + log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); + // 持久化消息 + BmsInformation bmsInformation = new BmsInformation(); + BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); + bmsInformationService.create(bmsInformation); + break; + case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: + ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); + log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); + // 持久化消息 + ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); + BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); + chargingPileStartsChargingService.create(chargingPileStartsCharging); + break; + case SendTagConstant.PLATFORM_START_CHARGING_REPLY: + ChargingPileStartsChargingMessage chargingPileStartsChargingMessage1 = message.getChargingPileStartsChargingMessage(); + log.info("远程启机命令回复-业务消息处理:{}",chargingPileStartsChargingMessage1); + // 持久化消息 + PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); + BeanUtils.copyProperties(chargingPileStartsChargingMessage1,platformStartChargingReply); + platformStartChargingReplyService.create(platformStartChargingReply); + // 业务处理 + com.ruoyi.order.api.vo.PlatformStartChargingReplyMessage message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessage(); + BeanUtils.copyProperties(chargingPileStartsChargingMessage1, message1); + chargingOrderClient.startChargeSuccessfully(message1); + break; + case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: + PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); + log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); + // 持久化消息 + PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); + BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); + platformStopChargingReplyService.create(platformStopChargingReply); + break; + case SendTagConstant.TRANSACTION_RECORD: + TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); + log.info("交易记录-业务消息处理:{}",transactionRecordMessage); + // 持久化消息 + TransactionRecord transactionRecord = new TransactionRecord(); + BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); + transactionRecordService.create(transactionRecord); + // 业务处理 + TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderRecord)){ + chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); + chargingOrderClient.updateChargingOrder(chargingOrderRecord); + } + break; + case SendTagConstant.UPDATE_BALANCE_REPLY: + UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); + log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); + // 持久化消息 + UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); + BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); + updateBalanceReplyService.create(updateBalanceReply); + break; + case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: + SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); + log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); + // 持久化消息 + SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); + BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); + synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); + break; + case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: + ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); + log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); + // 持久化消息 + ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); + BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); + clearOfflineCardReplyService.create(clearOfflineCardReply); + break; + case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: + WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); + log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); + // 持久化消息 + WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); + BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); + workingParameterSettingReplyService.create(workingParameterSettingReply); + break; + case SendTagConstant.TIMING_SETTING: + TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); + log.info("对时设置-业务消息处理:{}",timingSettingMessage); + // 持久化消息 + TimingSetting timingSetting = new TimingSetting(); + BeanUtils.copyProperties(timingSettingMessage,timingSetting); + timingSettingService.create(timingSetting); + break; + case SendTagConstant.SETUP_BILLING_MODEL_REPLY: + SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); + log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); + // 持久化消息 + SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); + BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); + setupBillingModelReplyService.create(setupBillingModelReply); + break; + case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: + GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); + log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); + // 持久化消息 + GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); + BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); + groundLockRealTimeDataService.create(groundLockRealTimeData); + break; + case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: + ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); + log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); + // 持久化消息 + ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); + BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); + chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); + break; + case SendTagConstant.PLATFORM_RESTART_REPLY: + PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); + log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); + // 持久化消息 + PlatformRestartReply platformRestartReply = new PlatformRestartReply(); + BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); + platformRestartReplyService.create(platformRestartReply); + break; + case SendTagConstant.QR_CODE_DELIVERY_REPLY: + QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); + log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); + QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); + BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); + qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); + break; + case SendTagConstant.SECURITY_DETECTION: + SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); + log.info("安全监测-业务消息处理:{}",securityDetectionMessage); + SecurityDetection securityDetection = new SecurityDetection(); + BeanUtils.copyProperties(securityDetectionMessage,securityDetection); + securityDetectionService.create(securityDetection); + break; + default: + PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); + log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); + PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); + BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); + platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); + break; + } + } + + @Override + protected void handleMaxRetriesExceeded(ChargingMessage message) { + // 当超过指定重试次数消息时此处方法会被调用 + // 生产中可以进行回退或其他业务操作 + log.error("消息消费失败,请执行后续处理"); + } + + + /** + * 是否执行重试机制 + */ + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + // 是否抛出异常,false搭配retry自行处理异常 + return false; + } + + /** + * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 + * @param message 待处理消息 + * @return true: 本次消息被过滤,false:不过滤 + */ + @Override + protected boolean filter(ChargingMessage message) { + // 此处可做消息过滤 + return false; + } + + /** + * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 + */ + @Override + public void onMessage(ChargingMessage message) { + super.dispatchMessage(message); + } +} \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java index 21d9a0e..ce9a938 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java @@ -343,4 +343,15 @@ return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.SECURITY_DETECTION, SendTagConstant.SECURITY_DETECTION, message); } + /** + * 充电桩登录认证 + */ + public SendResult chargingMessage(ChargingMessage message) { + // 设置业务key + message.setKey(UUID.randomUUID().toString()); + // 设置消息来源,便于查询 + message.setSource(SendTagConstant.CHARGING_MESSAGE); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_MESSAGE, SendTagConstant.CHARGING_MESSAGE, message); + } + } \ No newline at end of file -- Gitblit v1.7.1