From 1adec9fead03f0f788a73f9349ccba86569e31f3 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期三, 30 四月 2025 19:40:11 +0800 Subject: [PATCH] 修改rocketmq连接方式和修改发起充电异常情况下将订单挂起的功能 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java | 55 ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java | 53 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java | 59 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java | 15 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java | 98 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java | 55 ruoyi-service/ruoyi-integration/pom.xml | 50 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java | 2 ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java | 392 ---- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java | 57 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java | 74 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java | 56 ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java | 9 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java | 5 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java | 57 ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java | 11 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java | 50 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java | 58 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java | 179 - ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java | 91 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java | 54 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java | 57 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java | 58 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java | 64 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java | 40 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java | 70 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java | 56 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java | 68 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java | 67 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java | 61 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java | 57 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java | 1039 ++++++------ ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java | 151 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java | 1251 +++++++------- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java | 63 41 files changed, 1,825 insertions(+), 3,093 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/pom.xml b/ruoyi-service/ruoyi-integration/pom.xml index ef03c91..9874e38 100644 --- a/ruoyi-service/ruoyi-integration/pom.xml +++ b/ruoyi-service/ruoyi-integration/pom.xml @@ -116,30 +116,36 @@ <scope>test</scope> </dependency> <!--rocketmq--> - <dependency> - <groupId>com.alibaba.cloud</groupId> - <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> - <version>2.2.2.RELEASE</version> - <exclusions> - <exclusion> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-acl</artifactId> - </exclusion> - </exclusions> - </dependency> +<!-- <dependency>--> +<!-- <groupId>com.alibaba.cloud</groupId>--> +<!-- <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>--> +<!-- <version>2.2.2.RELEASE</version>--> +<!-- <exclusions>--> +<!-- <exclusion>--> +<!-- <groupId>org.apache.rocketmq</groupId>--> +<!-- <artifactId>rocketmq-client</artifactId>--> +<!-- </exclusion>--> +<!-- <exclusion>--> +<!-- <groupId>org.apache.rocketmq</groupId>--> +<!-- <artifactId>rocketmq-acl</artifactId>--> +<!-- </exclusion>--> +<!-- </exclusions>--> +<!-- </dependency>--> +<!-- <dependency>--> +<!-- <groupId>org.apache.rocketmq</groupId>--> +<!-- <artifactId>rocketmq-client</artifactId>--> +<!-- <version>4.7.1</version>--> +<!-- </dependency>--> +<!-- <dependency>--> +<!-- <groupId>org.apache.rocketmq</groupId>--> +<!-- <artifactId>rocketmq-acl</artifactId>--> +<!-- <version>4.7.1</version>--> +<!-- </dependency>--> + <dependency> <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - <version>4.7.1</version> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-acl</artifactId> - <version>4.7.1</version> + <artifactId>rocketmq-spring-boot-starter</artifactId> + <version>2.3.0</version> </dependency> <!--mongodb--> diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java index a6e9f2b..a05ffbf 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java @@ -6,10 +6,6 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -24,7 +20,6 @@ @SpringBootApplication @EnableScheduling//开启定时任务 @EnableTransactionManagement//开启事务 -@EnableBinding({ Source.class, Sink.class }) public class RuoYiIntegrationApplication { public static void main(String[] args) { SpringApplication.run(RuoYiIntegrationApplication.class, args); diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java index 3d77b64..8349cae 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java @@ -300,7 +300,7 @@ public BaseResult queryStationsInfo(@RequestBody BaseRequest baseRequest, HttpServletRequest request){ log.info("三方平台查询充电站信息请求参数:" + JacksonUtils.toJson(baseRequest)); //校验token和签名 - BaseResult baseResult = requestCheckJianGuan(true, baseRequest, request); + BaseResult baseResult = requestCheck(true, baseRequest, request); if(0 != baseResult.getRet()){ log.info("三方平台查询充电站信息响应Data:"); baseResult.setData(""); @@ -399,8 +399,8 @@ stationInfo.setCountryCode(StringUtils.isNotEmpty(datum.getCountryCode()) ? datum.getCountryCode() : "CN"); stationInfo.setAreaCode(datum.getDistrictsCode()); stationInfo.setAddress(datum.getAddress()); - stationInfo.setStationTel(datum.getPhone()); - stationInfo.setServiceTel(serviceTel); + stationInfo.setStationTel(StringUtils.isNotEmpty(datum.getPhone()) ? datum.getPhone().split(",")[0] : ""); + stationInfo.setServiceTel(StringUtils.isNotEmpty(serviceTel) ? serviceTel.split(",")[0] : ""); switch (datum.getSiteType()){ case 0: stationInfo.setStationType(StationTypeEnum.OTHER.getType()); @@ -588,7 +588,7 @@ * @return */ @PostMapping("/pushChargingGunStatus") - public R pushChargingGunStatus(@RequestParam("fullNumber") String fullNumber, @RequestParam("status") Integer status){ + public R pushChargingGunStatus(@RequestParam(value = "fullNumber") String fullNumber, @RequestParam(value = "status") Integer status){ ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo(); connectorStatusInfo.setConnectorID(fullNumber); switch (status){ @@ -1282,7 +1282,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); - chargeDetail.setDetailPower(datum.getChargingCapacity()); + chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); @@ -1489,6 +1489,7 @@ notificationChargeOrderInfo.setStartChargeSeq(startChargeSeq); TChargingOrder chargingOrder = chargingOrderClient.getChargingOrderByStartChargeSeq(startChargeSeq).getData(); if(null == chargingOrder){ + log.info("三方平台流水号获取订单失败"); return null; } TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); @@ -1556,7 +1557,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); - chargeDetail.setDetailPower(datum.getChargingCapacity()); + chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); @@ -1751,7 +1752,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); - chargeDetail.setDetailPower(datum.getChargingCapacity()); + chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_EVEN)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java index db9c638..e45fa9b 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java @@ -15,7 +15,6 @@ import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.iotda.utils.tools.StrategyUtil; import com.ruoyi.integration.rocket.model.*; -import com.ruoyi.integration.rocket.produce.ChargingMessageUtil; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -49,9 +48,6 @@ @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; - @Resource - private ChargingMessageUtil chargingMessageUtil; - @@ -84,8 +80,9 @@ switch (service_id){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); - chargingMessage.setOnlineMessage(onlineMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.onlineMessage(onlineMessage); +// chargingMessage.setOnlineMessage(onlineMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 // 业务处理 登录认证应答 OnlineReply onlineReply = new OnlineReply(); @@ -102,8 +99,9 @@ break; case SendTagConstant.PING: PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); - chargingMessage.setPingMessage(pingMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.pingMessage(pingMessage); +// chargingMessage.setPingMessage(pingMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); @@ -114,22 +112,25 @@ break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); - chargingMessage.setEndChargeMessage(endChargeMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.endChargeMessage(endChargeMessage); +// chargingMessage.setEndChargeMessage(endChargeMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); - chargingMessage.setErrorMessageMessage(errorMessageMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage); +// chargingMessage.setErrorMessageMessage(errorMessageMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); - chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage); +// chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ @@ -157,8 +158,9 @@ break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); - chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage); +// chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); @@ -177,55 +179,63 @@ case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: log.info("充电实时数据上传"); UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); - chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); +// chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); - chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage); +// chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); - chargingMessage.setParameterSettingMessage(parameterSettingMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage); +// chargingMessage.setParameterSettingMessage(parameterSettingMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); - chargingMessage.setBmsAbortMessage(bmsAbortMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage); +// chargingMessage.setBmsAbortMessage(bmsAbortMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); - chargingMessage.setMotorAbortMessage(motorAbortMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage); +// chargingMessage.setMotorAbortMessage(motorAbortMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); - chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); +// chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); - chargingMessage.setBmsInformationMessage(bmsInformationMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage); +// chargingMessage.setBmsInformationMessage(bmsInformationMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); - chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage); +// chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); @@ -239,23 +249,26 @@ break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); - chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage); +// chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); - chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage); +// chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); transactionRecordMessage.setResult(content.toJSONString()); - chargingMessage.setTransactionRecordMessage(transactionRecordMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage); +// chargingMessage.setTransactionRecordMessage(transactionRecordMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); @@ -265,36 +278,41 @@ break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); - chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage); +// chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); - chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); +// chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); - chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage); +// chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); - chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage); +// chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); - chargingMessage.setTimingSettingMessage(timingSettingMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage); +// chargingMessage.setTimingSettingMessage(timingSettingMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReply = new TimingSettingReply(); timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); @@ -304,55 +322,62 @@ break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); - chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage); +// chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); - chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage); +// chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); - chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); +// chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); - chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage); +// chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); - chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); +// chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); - chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage); +// chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); - chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); - chargingMessageUtil.handleMessage(chargingMessage); + sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); +// chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); +// chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; } -// log.info("rocketmq消息下发结果:{}",sendResult); + log.info("rocketmq消息下发结果:{}",sendResult); return AjaxResult.success(); } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java index 0dc777d..4b45d82 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java @@ -30,76 +30,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_acquisition_billing_mode", topic = "charge_acquisition_billing_mode", - selectorExpression = "acquisition_billing_mode", // 明确指定标签 - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "acquisition_billing_mode" ) -public class AcquisitionBillingModeMessageListener extends EnhanceMessageHandler<AcquisitionBillingModeMessage> implements RocketMQListener<AcquisitionBillingModeMessage> { +public class AcquisitionBillingModeMessageListener implements RocketMQListener<AcquisitionBillingModeMessage> { @Autowired private AcquisitionBillingModeService acquisitionBillingModeService; - @Autowired - private AccountingStrategyDetailClient accountingStrategyDetailClient; - @Autowired - private IotMessageProduce iotMessageProduce; - @Autowired - private MessageUtil messageUtil; - @Override - protected void handleMessage(AcquisitionBillingModeMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩计费模型请求-业务消息处理:{}",message); - // 持久化消息 - AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); - BeanUtils.copyProperties(message,acquisitionBillingMode); - acquisitionBillingModeService.create(acquisitionBillingMode); - // 业务处理 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 -// List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(message.getCharging_pile_code()).getData(); -// Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); -// // 价格设置 -// AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); -// StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); -// // 时段设置 -// StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); -// iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); - } - - @Override - protected void handleMaxRetriesExceeded(AcquisitionBillingModeMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(AcquisitionBillingModeMessage message) { - // 此处可做消息过滤 - return false; - } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(AcquisitionBillingModeMessage message) { - super.dispatchMessage(message); + log.info("充电桩计费模型请求-业务消息处理:{}",message); + // 持久化消息 + AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); + BeanUtils.copyProperties(message,acquisitionBillingMode); + acquisitionBillingModeService.create(acquisitionBillingMode); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java index fcd2bab..e8cbae8 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java @@ -27,63 +27,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_billing_mode_verify", topic = "charge_billing_mode_verify", - selectorExpression = "billing_mode_verify", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "billing_mode_verify" ) -public class BillingModeVerifyMessageListener extends EnhanceMessageHandler<BillingModeVerifyMessage> implements RocketMQListener<BillingModeVerifyMessage> { +public class BillingModeVerifyMessageListener implements RocketMQListener<BillingModeVerifyMessage> { @Autowired private BillingModeVerifyService billingModeVerifyService; - - @Override - protected void handleMessage(BillingModeVerifyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("计费模型验证请求-业务消息处理:{}",message); - // 持久化消息 - BillingModeVerify billingModeVerify = new BillingModeVerify(); - BeanUtils.copyProperties(message,billingModeVerify); - billingModeVerifyService.create(billingModeVerify); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(BillingModeVerifyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(BillingModeVerifyMessage message) { - // 此处可做消息过滤 - return false; - } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BillingModeVerifyMessage message) { - super.dispatchMessage(message); + log.info("计费模型验证请求-业务消息处理:{}",message); + // 持久化消息 + BillingModeVerify billingModeVerify = new BillingModeVerify(); + BeanUtils.copyProperties(message,billingModeVerify); + billingModeVerifyService.create(billingModeVerify); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java index ceeebfc..d1fa0d1 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Slf4j @Component @@ -22,22 +25,21 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_abort", topic = "charge_bms_abort", - selectorExpression = "bms_abort", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "bms_abort" ) -public class BmsAbortMessageListener extends EnhanceMessageHandler<BmsAbortMessage> implements RocketMQListener<BmsAbortMessage> { +public class BmsAbortMessageListener implements RocketMQListener<BmsAbortMessage> { @Autowired private BmsAbortService bmsAbortService; @Resource private ChargingOrderClient chargingOrderClient; - - - + + /** + * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 + */ @Override - protected void handleMessage(BmsAbortMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 + public void onMessage(BmsAbortMessage message) { log.info("充电阶段BMS中止-业务消息处理:{}",message); // 持久化消息 BmsAbort bmsAbort = new BmsAbort(); @@ -45,46 +47,5 @@ bmsAbortService.create(bmsAbort); // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); - } - - @Override - protected void handleMaxRetriesExceeded(BmsAbortMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(BmsAbortMessage message) { - // 此处可做消息过滤 - return false; - } - - /** - * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 - */ - @Override - public void onMessage(BmsAbortMessage message) { - super.dispatchMessage(message); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java index a350c1e..a5867e7 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java @@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.Objects; @Slf4j @@ -23,69 +24,32 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_demand_and_charger_exportation", topic = "charge_bms_demand_and_charger_exportation", - selectorExpression = "bms_demand_and_charger_exportation", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "bms_demand_and_charger_exportation" ) -public class BmsDemandAndChargerExportationMessageListener extends EnhanceMessageHandler<BmsDemandAndChargerExportationMessage> implements RocketMQListener<BmsDemandAndChargerExportationMessage> { +public class BmsDemandAndChargerExportationMessageListener implements RocketMQListener<BmsDemandAndChargerExportationMessage> { @Autowired private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; - @Autowired + @Resource private ChargingOrderClient chargingOrderClient; - @Override - protected void handleMessage(BmsDemandAndChargerExportationMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message); - // 持久化消息 - BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); - BeanUtils.copyProperties(message,bmsDemandAndChargerExportation); - bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); - // 业务处理 - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrder)){ - chargingOrder.setNeedElec(message.getBms_current_requirements()); - chargingOrderClient.updateChargingOrder(chargingOrder); - } - } - - @Override - protected void handleMaxRetriesExceeded(BmsDemandAndChargerExportationMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(BmsDemandAndChargerExportationMessage message) { - // 此处可做消息过滤 - return false; - } + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BmsDemandAndChargerExportationMessage message) { - super.dispatchMessage(message); + log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message); + // 持久化消息 + BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); + BeanUtils.copyProperties(message,bmsDemandAndChargerExportation); + bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); + + // 业务处理 + TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderBms)){ + chargingOrderBms.setNeedElec(message.getBms_current_requirements()); + chargingOrderClient.updateChargingOrder(chargingOrderBms); + } } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java index b90a685..f8f24b2 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java @@ -1,10 +1,8 @@ package com.ruoyi.integration.rocket.listener; -import com.ruoyi.integration.api.model.BmsDemandAndChargerExportation; import com.ruoyi.integration.api.model.BmsInformation; import com.ruoyi.integration.mongodb.service.BmsInformationService; import com.ruoyi.integration.rocket.model.BmsInformationMessage; -import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -19,62 +17,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_information", topic = "charge_bms_information", - selectorExpression = "bms_information", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "bms_information" ) -public class BmsInformationMessageListener extends EnhanceMessageHandler<BmsInformationMessage> implements RocketMQListener<BmsInformationMessage> { +public class BmsInformationMessageListener implements RocketMQListener<BmsInformationMessage> { @Autowired private BmsInformationService bmsInformationService; - - @Override - protected void handleMessage(BmsInformationMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电过程BMS信息-业务消息处理:{}",message); - // 持久化消息 - BmsInformation bmsInformation = new BmsInformation(); - BeanUtils.copyProperties(message,bmsInformation); - bmsInformationService.create(bmsInformation); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(BmsInformationMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(BmsInformationMessage message) { - // 此处可做消息过滤 - return false; - } + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BmsInformationMessage message) { - super.dispatchMessage(message); + log.info("充电过程BMS信息-业务消息处理:{}",message); + // 持久化消息 + BmsInformation bmsInformation = new BmsInformation(); + BeanUtils.copyProperties(message,bmsInformation); + bmsInformationService.create(bmsInformation); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java index 2f5439e..b812bfd 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java @@ -19,62 +19,23 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_handshake", topic = "charge_charging_handshake", - selectorExpression = "charging_handshake", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "charging_handshake" ) -public class ChargingHandshakeMessageListener extends EnhanceMessageHandler<ChargingHandshakeMessage> implements RocketMQListener<ChargingHandshakeMessage> { +public class ChargingHandshakeMessageListener implements RocketMQListener<ChargingHandshakeMessage> { @Autowired private ChargingHandshakeService chargingHandshakeService; - @Override - protected void handleMessage(ChargingHandshakeMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电握手-业务消息处理:{}",message); - // 持久化消息 - ChargingHandshake chargingHandshake = new ChargingHandshake(); - BeanUtils.copyProperties(message,chargingHandshake); - chargingHandshakeService.create(chargingHandshake); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ChargingHandshakeMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ChargingHandshakeMessage message) { - // 此处可做消息过滤 - return false; - } + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingHandshakeMessage message) { - super.dispatchMessage(message); + log.info("充电握手-业务消息处理:{}",message); + // 持久化消息 + ChargingHandshake chargingHandshake = new ChargingHandshake(); + BeanUtils.copyProperties(message,chargingHandshake); + chargingHandshakeService.create(chargingHandshake); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java index 9964786..108a554 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_pile_returns_ground_lock_data", topic = "charge_charging_pile_returns_ground_lock_data", - selectorExpression = "charging_pile_returns_ground_lock_data", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "charging_pile_returns_ground_lock_data" ) -public class ChargingPileReturnsGroundLockDataMessageListener extends EnhanceMessageHandler<ChargingPileReturnsGroundLockDataMessage> implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> { +public class ChargingPileReturnsGroundLockDataMessageListener implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> { @Autowired private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; - @Override - protected void handleMessage(ChargingPileReturnsGroundLockDataMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩返回数据(上行)-业务消息处理:{}",message); - // 持久化消息 - ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); - BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData); - chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ChargingPileReturnsGroundLockDataMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ChargingPileReturnsGroundLockDataMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingPileReturnsGroundLockDataMessage message) { - super.dispatchMessage(message); + log.info("充电桩返回数据(上行)-业务消息处理:{}",message); + // 持久化消息 + ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); + BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData); + chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java index 903deaa..4371d11 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_pile_starts_charging", topic = "charge_charging_pile_starts_charging", - selectorExpression = "charging_pile_starts_charging", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "charging_pile_starts_charging" ) -public class ChargingPileStartsChargingMessageListener extends EnhanceMessageHandler<ChargingPileStartsChargingMessage> implements RocketMQListener<ChargingPileStartsChargingMessage> { +public class ChargingPileStartsChargingMessageListener implements RocketMQListener<ChargingPileStartsChargingMessage> { @Autowired private ChargingPileStartsChargingService chargingPileStartsChargingService; - @Override - protected void handleMessage(ChargingPileStartsChargingMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩主动申请启动充电-业务消息处理:{}",message); - // 持久化消息 - ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); - BeanUtils.copyProperties(message,chargingPileStartsCharging); - chargingPileStartsChargingService.create(chargingPileStartsCharging); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ChargingPileStartsChargingMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ChargingPileStartsChargingMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingPileStartsChargingMessage message) { - super.dispatchMessage(message); + log.info("充电桩主动申请启动充电-业务消息处理:{}",message); + // 持久化消息 + ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); + BeanUtils.copyProperties(message,chargingPileStartsCharging); + chargingPileStartsChargingService.create(chargingPileStartsCharging); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java index 8a82427..43b7678 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_clear_offline_card_reply", topic = "charge_clear_offline_card_reply", - selectorExpression = "clear_offline_card_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "clear_offline_card_reply" ) -public class ClearOfflineCardReplyMessageListener extends EnhanceMessageHandler<ClearOfflineCardReplyMessage> implements RocketMQListener<ClearOfflineCardReplyMessage> { +public class ClearOfflineCardReplyMessageListener implements RocketMQListener<ClearOfflineCardReplyMessage> { @Autowired private ClearOfflineCardReplyService clearOfflineCardReplyService; - @Override - protected void handleMessage(ClearOfflineCardReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("离线卡数据清除应答-业务消息处理:{}",message); - // 持久化消息 - ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); - BeanUtils.copyProperties(message,clearOfflineCardReply); - clearOfflineCardReplyService.create(clearOfflineCardReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ClearOfflineCardReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ClearOfflineCardReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ClearOfflineCardReplyMessage message) { - super.dispatchMessage(message); + log.info("离线卡数据清除应答-业务消息处理:{}",message); + // 持久化消息 + ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); + BeanUtils.copyProperties(message,clearOfflineCardReply); + clearOfflineCardReplyService.create(clearOfflineCardReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java index bb8d506..366568e 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java @@ -3,13 +3,17 @@ import com.ruoyi.integration.api.model.EndCharge; import com.ruoyi.integration.api.model.Ping; import com.ruoyi.integration.api.model.Pong; +import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce; import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.mongodb.service.EndChargeService; import com.ruoyi.integration.mongodb.service.PingService; +import com.ruoyi.integration.rocket.model.ChargingMessage; +import com.ruoyi.integration.rocket.model.ChargingOrderMessage; import com.ruoyi.integration.rocket.model.EndChargeMessage; import com.ruoyi.integration.rocket.model.PingMessage; +import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import lombok.extern.slf4j.Slf4j; @@ -28,21 +32,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_end_charge", topic = "charge_end_charge", - selectorExpression = "end_charge", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "end_charge" ) -public class EndChargeMessageListener extends EnhanceMessageHandler<EndChargeMessage> implements RocketMQListener<EndChargeMessage> { +public class EndChargeMessageListener implements RocketMQListener<EndChargeMessage> { @Autowired private EndChargeService endChargeService; @Resource private ChargingOrderClient chargingOrderClient; + @Autowired + private EnhanceProduce enhanceProduce; - + + /** + * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 + */ @Override - protected void handleMessage(EndChargeMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 + public void onMessage(EndChargeMessage message) { log.info("充电结束-业务消息处理:{}",message); // 持久化消息 EndCharge endCharge = new EndCharge(); @@ -50,46 +57,19 @@ endChargeService.create(endCharge); // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); - } - - @Override - protected void handleMaxRetriesExceeded(EndChargeMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(EndChargeMessage message) { - // 此处可做消息过滤 - return false; - } - - /** - * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 - */ - @Override - public void onMessage(EndChargeMessage message) { - super.dispatchMessage(message); + // 订单id + String transactionSerialNumber = endCharge.getTransaction_serial_number(); + ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); + chargingOrderMessage.setOrderNumber(transactionSerialNumber); + // 推送充电订单信息 + ChargingMessage chargingMessage1 = new ChargingMessage(); + chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); + chargingMessage1.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderInfoMessage(chargingMessage1); + // 推送充电订单状态 + ChargingMessage chargingMessage2 = new ChargingMessage(); + chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage2.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderStatusMessage(chargingMessage2); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java index 8c67b98..fc1392c 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java @@ -17,64 +17,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_error_message", topic = "charge_error_message", - selectorExpression = "error_message", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "error_message" ) -public class ErrorMessageMessageListener extends EnhanceMessageHandler<ErrorMessageMessage> implements RocketMQListener<ErrorMessageMessage> { +public class ErrorMessageMessageListener implements RocketMQListener<ErrorMessageMessage> { @Autowired private ErrorMessageMessageService errorMessageMessageService; - - - @Override - protected void handleMessage(ErrorMessageMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("错误报文-业务消息处理:{}",message); - // 持久化消息 - ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); - BeanUtils.copyProperties(message,errorMessageMessage); - errorMessageMessageService.create(errorMessageMessage); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ErrorMessageMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ErrorMessageMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ErrorMessageMessage message) { - super.dispatchMessage(message); + log.info("错误报文-业务消息处理:{}",message); + // 持久化消息 + ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); + BeanUtils.copyProperties(message,errorMessageMessage); + errorMessageMessageService.create(errorMessageMessage); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java index 91768db..6e5d624 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_ground_lock_real_time_data", topic = "charge_ground_lock_real_time_data", - selectorExpression = "ground_lock_real_time_data", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "ground_lock_real_time_data" ) -public class GroundLockRealTimeDataMessageListener extends EnhanceMessageHandler<GroundLockRealTimeDataMessage> implements RocketMQListener<GroundLockRealTimeDataMessage> { +public class GroundLockRealTimeDataMessageListener implements RocketMQListener<GroundLockRealTimeDataMessage> { @Autowired private GroundLockRealTimeDataService groundLockRealTimeDataService; - @Override - protected void handleMessage(GroundLockRealTimeDataMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message); - // 持久化消息 - GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); - BeanUtils.copyProperties(message,groundLockRealTimeData); - groundLockRealTimeDataService.create(groundLockRealTimeData); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(GroundLockRealTimeDataMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(GroundLockRealTimeDataMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(GroundLockRealTimeDataMessage message) { - super.dispatchMessage(message); + log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message); + // 持久化消息 + GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); + BeanUtils.copyProperties(message,groundLockRealTimeData); + groundLockRealTimeDataService.create(groundLockRealTimeData); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java index e6c6dc8..69fd377 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java @@ -22,10 +22,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_motor_abort", topic = "charge_motor_abort", - selectorExpression = "motor_abort", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "motor_abort" ) -public class MotorAbortMessageListener extends EnhanceMessageHandler<MotorAbortMessage> implements RocketMQListener<MotorAbortMessage> { +public class MotorAbortMessageListener implements RocketMQListener<MotorAbortMessage> { @Autowired private MotorAbortService motorAbortService; @@ -34,11 +33,11 @@ private ChargingOrderClient chargingOrderClient; - - + /** + * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 + */ @Override - protected void handleMessage(MotorAbortMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 + public void onMessage(MotorAbortMessage message) { log.info("充电阶段充电机中止-业务消息处理:{}",message); // 持久化消息 MotorAbort motorAbort = new MotorAbort(); @@ -46,46 +45,5 @@ motorAbortService.create(motorAbort); // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); - } - - @Override - protected void handleMaxRetriesExceeded(MotorAbortMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(MotorAbortMessage message) { - // 此处可做消息过滤 - return false; - } - - /** - * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 - */ - @Override - public void onMessage(MotorAbortMessage message) { - super.dispatchMessage(message); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java index 496d44a..bdb90e1 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java @@ -1,5 +1,6 @@ package com.ruoyi.integration.rocket.listener; +import com.alibaba.fastjson.JSON; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.mongodb.service.OnlineService; import com.ruoyi.integration.rocket.model.OnlineMessage; @@ -16,63 +17,22 @@ @RocketMQMessageListener( consumerGroup = "charge_online", topic = "charge_online", - selectorExpression = "online", // 明确指定标签 - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "online" ) -public class OnlineMessageListener extends EnhanceMessageHandler<OnlineMessage> implements RocketMQListener<OnlineMessage> { +public class OnlineMessageListener implements RocketMQListener<OnlineMessage> { @Autowired private OnlineService onlineService; - - @Override - protected void handleMessage(OnlineMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩登录认证业务消息处理:{}",message); - // 持久化消息 - Online online = new Online(); - BeanUtils.copyProperties(message,online); - onlineService.create(online); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(OnlineMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(OnlineMessage message) { - // 此处可做消息过滤 - return false; - } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(OnlineMessage message) { - super.dispatchMessage(message); + log.info("充电桩登录认证业务消息处理:{}", JSON.toJSONString(message)); + // 持久化消息 + Online online = new Online(); + BeanUtils.copyProperties(message,online); + onlineService.create(online); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java index d5d2c33..f46c414 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java @@ -17,65 +17,26 @@ @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + consumerGroup = "charge_parameter_setting", + topic = "charge_parameter_setting", + selectorExpression = "parameter_setting" ) -public class ParameterSettingMessageListener extends EnhanceMessageHandler<ParameterSettingMessage> implements RocketMQListener<ParameterSettingMessage> { +public class ParameterSettingMessageListener implements RocketMQListener<ParameterSettingMessage> { @Autowired private ParameterSettingService parameterSettingService; - - @Override - protected void handleMessage(ParameterSettingMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("业务消息处理:{}",message); - // 持久化消息 - ParameterSetting parameterSetting = new ParameterSetting(); - BeanUtils.copyProperties(message,parameterSetting); - parameterSettingService.create(parameterSetting); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(ParameterSettingMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ParameterSettingMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ParameterSettingMessage message) { - super.dispatchMessage(message); + log.info("参数配置-业务消息处理:{}",message); + // 持久化消息 + ParameterSetting parameterSetting = new ParameterSetting(); + BeanUtils.copyProperties(message,parameterSetting); + parameterSettingService.create(parameterSetting); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java index 3b67a82..0abe121 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java @@ -1,18 +1,30 @@ package com.ruoyi.integration.rocket.listener; +import com.alibaba.fastjson2.JSON; +import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; +import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.Ping; +import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.PingService; +import com.ruoyi.integration.rocket.model.ChargingMessage; +import com.ruoyi.integration.rocket.model.GunStatusMessage; import com.ruoyi.integration.rocket.model.OnlineMessage; import com.ruoyi.integration.rocket.model.PingMessage; +import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; @Slf4j @Component @@ -20,62 +32,47 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_ping", topic = "charge_ping", - selectorExpression = "ping", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "ping" ) -public class PingMessageListener extends EnhanceMessageHandler<PingMessage> implements RocketMQListener<PingMessage> { +public class PingMessageListener implements RocketMQListener<PingMessage> { @Autowired private PingService pingService; - @Override - protected void handleMessage(PingMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩心跳包-业务消息处理:{}",message); - // 持久化消息 - Ping ping = new Ping(); - BeanUtils.copyProperties(message,ping); - pingService.create(ping); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(PingMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(PingMessage message) { - // 此处可做消息过滤 - return false; - } + @Resource + private RedisTemplate redisTemplate; + @Autowired + private EnhanceProduce enhanceProduce; + + @Resource + private ChargingPileClient chargingPileClient; + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PingMessage message) { - super.dispatchMessage(message); + log.info("充电桩心跳包-业务消息处理:{}", JSON.toJSONString(message)); + // 持久化消息 + Ping ping = new Ping(); + BeanUtils.copyProperties(message,ping); + pingService.save(ping); + //存储缓存中,5分钟有效 + redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); + + UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); + vo1.setGun_code(message.getCharging_gun_code()); + vo1.setPile_code(message.getCharging_pile_code()); + vo1.setStatus(message.getCharging_gun_status()); + chargingPileClient.updateChargingPileStatus(vo1); + // 监管平台推送充电设备状态 + SendResult sendResult; + String gunCode = message.getCharging_pile_code() + message.getCharging_gun_code(); + ChargingMessage chargingMessage = new ChargingMessage(); + chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); + GunStatusMessage gunStatusMessage = new GunStatusMessage(); + gunStatusMessage.setFullNumber(gunCode); + chargingMessage.setGunStatusMessage(gunStatusMessage); + sendResult = enhanceProduce.gunStatusMessage(chargingMessage); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java index 74f0e41..818128c 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java @@ -1,6 +1,7 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.Online; +import com.ruoyi.integration.api.model.ParameterSetting; import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply; import com.ruoyi.integration.mongodb.service.PlatformRemoteUpdateReplyService; import com.ruoyi.integration.rocket.model.PlatformRemoteUpdateReplyMessage; @@ -19,63 +20,23 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_remote_update_reply", topic = "charge_platform_remote_update_reply", - selectorExpression = "platform_remote_update_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "platform_remote_update_reply" ) -public class PlatformRemoteUpdateReplyMessageListener extends EnhanceMessageHandler<PlatformRemoteUpdateReplyMessage> implements RocketMQListener<PlatformRemoteUpdateReplyMessage> { +public class PlatformRemoteUpdateReplyMessageListener implements RocketMQListener<PlatformRemoteUpdateReplyMessage> { @Autowired private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; - - @Override - protected void handleMessage(PlatformRemoteUpdateReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("远程更新应答-业务消息处理:{}",message); - // 持久化消息 - PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); - BeanUtils.copyProperties(message,platformRemoteUpdateReply); - platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(PlatformRemoteUpdateReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(PlatformRemoteUpdateReplyMessage message) { - // 此处可做消息过滤 - return false; - } + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformRemoteUpdateReplyMessage message) { - super.dispatchMessage(message); + log.info("业务消息处理:{}",message); + // 持久化消息 + PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); + BeanUtils.copyProperties(message,platformRemoteUpdateReply); + platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java index 4da5259..a512a5a 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java @@ -19,63 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_restart_reply", topic = "charge_platform_restart_reply", - selectorExpression = "platform_restart_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "platform_restart_reply" ) -public class PlatformRestartReplyMessageListener extends EnhanceMessageHandler<PlatformRestartReplyMessage> implements RocketMQListener<PlatformRestartReplyMessage> { +public class PlatformRestartReplyMessageListener implements RocketMQListener<PlatformRestartReplyMessage> { @Autowired private PlatformRestartReplyService platformRestartReplyService; - - @Override - protected void handleMessage(PlatformRestartReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("远程重启应答-业务消息处理:{}",message); - // 持久化消息 - PlatformRestartReply platformRestartReply = new PlatformRestartReply(); - BeanUtils.copyProperties(message,platformRestartReply); - platformRestartReplyService.create(platformRestartReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(PlatformRestartReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(PlatformRestartReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformRestartReplyMessage message) { - super.dispatchMessage(message); + log.info("远程重启应答-业务消息处理:{}",message); + // 持久化消息 + PlatformRestartReply platformRestartReply = new PlatformRestartReply(); + BeanUtils.copyProperties(message,platformRestartReply); + platformRestartReplyService.create(platformRestartReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java index a22254f..decb747 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java @@ -23,10 +23,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_start_charging_reply", topic = "charge_platform_start_charging_reply", - selectorExpression = "platform_start_charging_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "platform_start_charging_reply" ) -public class PlatformStartChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStartChargingReplyMessage> implements RocketMQListener<PlatformStartChargingReplyMessage> { +public class PlatformStartChargingReplyMessageListener implements RocketMQListener<PlatformStartChargingReplyMessage> { @Autowired private PlatformStartChargingReplyService platformStartChargingReplyService; @@ -36,58 +35,20 @@ - @Override - protected void handleMessage(PlatformStartChargingReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("远程启机命令回复-业务消息处理:{}",message); - // 持久化消息 - PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); - BeanUtils.copyProperties(message,platformStartChargingReply); - platformStartChargingReplyService.create(platformStartChargingReply); - // 业务处理 - PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); - BeanUtils.copyProperties(message, message1); - chargingOrderClient.startChargeSuccessfully(message1); - } - - @Override - protected void handleMaxRetriesExceeded(PlatformStartChargingReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(PlatformStartChargingReplyMessage message) { - // 此处可做消息过滤 - return false; - } - /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformStartChargingReplyMessage message) { - super.dispatchMessage(message); + log.info("远程启机命令回复-业务消息处理:{}",message); + // 持久化消息 + PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); + BeanUtils.copyProperties(message,platformStartChargingReply); + platformStartChargingReplyService.create(platformStartChargingReply); + + // 业务处理 + PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); + BeanUtils.copyProperties(message, message1); + chargingOrderClient.startChargeSuccessfully(message1); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java index 6fb3c54..4d737cf 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java @@ -5,6 +5,8 @@ import com.ruoyi.integration.mongodb.service.PlatformStopChargingReplyService; import com.ruoyi.integration.rocket.model.PlatformStopChargingReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; +import com.ruoyi.order.api.feignClient.ChargingOrderClient; +import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -13,68 +15,37 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + @Slf4j @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_stop_charging_reply", topic = "charge_platform_stop_charging_reply", - selectorExpression = "platform_stop_charging_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "platform_stop_charging_reply" ) -public class PlatformStopChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStopChargingReplyMessage> implements RocketMQListener<PlatformStopChargingReplyMessage> { +public class PlatformStopChargingReplyMessageListener implements RocketMQListener<PlatformStopChargingReplyMessage> { @Autowired private PlatformStopChargingReplyService platformStopChargingReplyService; - - @Override - protected void handleMessage(PlatformStopChargingReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("远程停机命令回复-业务消息处理:{}",message); - // 持久化消息 - PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); - BeanUtils.copyProperties(message,platformStopChargingReply); - platformStopChargingReplyService.create(platformStopChargingReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(PlatformStopChargingReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(PlatformStopChargingReplyMessage message) { - // 此处可做消息过滤 - return false; - } + @Resource + private ChargingOrderClient chargingOrderClient; + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformStopChargingReplyMessage message) { - super.dispatchMessage(message); + log.info("远程停机命令回复-业务消息处理:{}",message); + // 持久化消息 + PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); + BeanUtils.copyProperties(message,platformStopChargingReply); + platformStopChargingReplyService.create(platformStopChargingReply); + + PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); + BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); + chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java new file mode 100644 index 0000000..50ca0dc --- /dev/null +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java @@ -0,0 +1,40 @@ +package com.ruoyi.integration.rocket.listener; + +import com.ruoyi.integration.api.model.QrCodeDeliveryReply; +import com.ruoyi.integration.mongodb.service.QrCodeDeliveryReplyService; +import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage; +import com.ruoyi.integration.rocket.model.QrCodeDeliveryReplyMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author zhibing.pu + * @Date 2025/4/28 14:57 + */ +@Slf4j +@Component +@RocketMQMessageListener( + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_qr_code_delivery_reply", + topic = "charge_qr_code_delivery_reply", + selectorExpression = "qr_code_delivery_reply" +) +public class QrCodeDeliveryReplyMessageListener implements RocketMQListener<QrCodeDeliveryReplyMessage> { + + @Autowired + private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; + + + @Override + public void onMessage(QrCodeDeliveryReplyMessage message) { + log.info("二维码下发应答-业务消息处理:{}",message); + QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); + BeanUtils.copyProperties(message,qrCodeDeliveryReply); + qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); + } +} diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java index 8f569da..4174407 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java @@ -1,10 +1,13 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.Online; +import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply; +import com.ruoyi.integration.api.model.PlatformStopChargingReply; import com.ruoyi.integration.api.model.QueryOfflineCardReply; import com.ruoyi.integration.mongodb.service.QueryOfflineCardReplyService; import com.ruoyi.integration.rocket.model.QueryOfflineCardReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; +import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -17,64 +20,27 @@ @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + consumerGroup = "charge_query_offline_card_reply", + topic = "charge_query_offline_card_reply", + selectorExpression = "query_offline_card_reply" ) -public class QueryOfflineCardReplyMessageListener extends EnhanceMessageHandler<QueryOfflineCardReplyMessage> implements RocketMQListener<QueryOfflineCardReplyMessage> { +public class QueryOfflineCardReplyMessageListener implements RocketMQListener<QueryOfflineCardReplyMessage> { @Autowired private QueryOfflineCardReplyService queryOfflineCardReplyService; - @Override - protected void handleMessage(QueryOfflineCardReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("离线卡数据查询应答-业务消息处理:{}",message); - // 持久化消息 - QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply(); - BeanUtils.copyProperties(message,queryOfflineCardReply); - queryOfflineCardReplyService.create(queryOfflineCardReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(QueryOfflineCardReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(QueryOfflineCardReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(QueryOfflineCardReplyMessage message) { - super.dispatchMessage(message); + log.info("业务消息处理:{}",message); + // 持久化消息 + QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply(); + BeanUtils.copyProperties(message,queryOfflineCardReply); + queryOfflineCardReplyService.create(queryOfflineCardReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java new file mode 100644 index 0000000..42ef6c7 --- /dev/null +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java @@ -0,0 +1,50 @@ +package com.ruoyi.integration.rocket.listener; + +import com.ruoyi.integration.api.model.SecurityDetection; +import com.ruoyi.integration.mongodb.service.SecurityDetectionService; +import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage; +import com.ruoyi.integration.rocket.model.SecurityDetectionMessage; +import com.ruoyi.order.api.feignClient.ChargingOrderClient; +import com.ruoyi.order.api.vo.SecurityDetectionVO; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author zhibing.pu + * @Date 2025/4/28 14:59 + */ +@Slf4j +@Component +@RocketMQMessageListener( + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_security_detection", + topic = "charge_security_detection", + selectorExpression = "security_detection" +) +public class SecurityDetectionMessageListener implements RocketMQListener<SecurityDetectionMessage> { + @Resource + private ChargingOrderClient chargingOrderClient; + @Autowired + private SecurityDetectionService securityDetectionService; + + + + @Override + public void onMessage(SecurityDetectionMessage message) { + log.info("安全监测-业务消息处理:{}",message); + SecurityDetection securityDetection = new SecurityDetection(); + BeanUtils.copyProperties(message,securityDetection); + securityDetectionService.create(securityDetection); + + SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); + BeanUtils.copyProperties(securityDetection, securityDetection1); + chargingOrderClient.securityDetection(securityDetection1); + } +} diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java index 1866383..c6b3a0c 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java @@ -19,63 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_setup_billing_model_reply", topic = "charge_setup_billing_model_reply", - selectorExpression = "setup_billing_model_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "setup_billing_model_reply" ) -public class SetupBillingModelReplyMessageListener extends EnhanceMessageHandler<SetupBillingModelReplyMessage> implements RocketMQListener<SetupBillingModelReplyMessage> { +public class SetupBillingModelReplyMessageListener implements RocketMQListener<SetupBillingModelReplyMessage> { @Autowired private SetupBillingModelReplyService setupBillingModelReplyService; - - @Override - protected void handleMessage(SetupBillingModelReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("计费模型应答-业务消息处理:{}",message); - // 持久化消息 - SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); - BeanUtils.copyProperties(message,setupBillingModelReply); - setupBillingModelReplyService.create(setupBillingModelReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(SetupBillingModelReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(SetupBillingModelReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(SetupBillingModelReplyMessage message) { - super.dispatchMessage(message); + log.info("计费模型应答-业务消息处理:{}",message); + // 持久化消息 + SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); + BeanUtils.copyProperties(message,setupBillingModelReply); + setupBillingModelReplyService.create(setupBillingModelReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java index 52212cd..9693c43 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java @@ -19,62 +19,25 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_synchronize_offline_card_reply", topic = "charge_synchronize_offline_card_reply", - selectorExpression = "synchronize_offline_card_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "synchronize_offline_card_reply" ) -public class SynchronizeOfflineCardReplyMessageListener extends EnhanceMessageHandler<SynchronizeOfflineCardReplyMessage> implements RocketMQListener<SynchronizeOfflineCardReplyMessage> { +public class SynchronizeOfflineCardReplyMessageListener implements RocketMQListener<SynchronizeOfflineCardReplyMessage> { @Autowired private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; - @Override - protected void handleMessage(SynchronizeOfflineCardReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("卡数据同步应答-业务消息处理:{}",message); - // 持久化消息 - SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); - BeanUtils.copyProperties(message,synchronizeOfflineCardReply); - synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(SynchronizeOfflineCardReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(SynchronizeOfflineCardReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(SynchronizeOfflineCardReplyMessage message) { - super.dispatchMessage(message); + log.info("卡数据同步应答-业务消息处理:{}",message); + // 持久化消息 + SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); + BeanUtils.copyProperties(message,synchronizeOfflineCardReply); + synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java index 635ed19..fb5332e 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java @@ -26,62 +26,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_timing_setting", topic = "charge_timing_setting", - selectorExpression = "timing_setting", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "timing_setting" ) -public class TimingSettingMessageListener extends EnhanceMessageHandler<TimingSettingMessage> implements RocketMQListener<TimingSettingMessage> { +public class TimingSettingMessageListener implements RocketMQListener<TimingSettingMessage> { @Autowired private TimingSettingService timingSettingService; - @Override - protected void handleMessage(TimingSettingMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("对时设置-业务消息处理:{}",message); - // 持久化消息 - TimingSetting timingSetting = new TimingSetting(); - BeanUtils.copyProperties(message,timingSetting); - timingSettingService.create(timingSetting); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(TimingSettingMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(TimingSettingMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(TimingSettingMessage message) { - super.dispatchMessage(message); + log.info("对时设置-业务消息处理:{}",message); + // 持久化消息 + TimingSetting timingSetting = new TimingSetting(); + BeanUtils.copyProperties(message,timingSetting); + timingSettingService.create(timingSetting); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java index 9019abf..68e41f0 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java @@ -1,24 +1,31 @@ package com.ruoyi.integration.rocket.listener; +import com.alibaba.fastjson.JSONObject; import com.ruoyi.integration.api.model.ConfirmTransactionRecord; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.TransactionRecord; +import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData; +import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce; import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.mongodb.service.TransactionRecordService; +import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService; import com.ruoyi.integration.rocket.model.TransactionRecordMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; +import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.Objects; @Slf4j @@ -27,69 +34,56 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_transaction_record", topic = "charge_transaction_record", - selectorExpression = "transaction_record", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "transaction_record" ) -public class TransactionRecordMessageListener extends EnhanceMessageHandler<TransactionRecordMessage> implements RocketMQListener<TransactionRecordMessage> { +public class TransactionRecordMessageListener implements RocketMQListener<TransactionRecordMessage> { @Autowired private TransactionRecordService transactionRecordService; @Autowired private ChargingOrderClient chargingOrderClient; - @Override - protected void handleMessage(TransactionRecordMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("交易记录-业务消息处理:{}",message); - // 持久化消息 - TransactionRecord transactionRecord = new TransactionRecord(); - BeanUtils.copyProperties(message,transactionRecord); - transactionRecordService.create(transactionRecord); - // 业务处理 - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrder)){ - chargingOrder.setTotalElectricity(message.getTotal_electricity()); - chargingOrderClient.updateChargingOrder(chargingOrder); - } - } - - @Override - protected void handleMaxRetriesExceeded(TransactionRecordMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(TransactionRecordMessage message) { - // 此处可做消息过滤 - return false; - } + @Autowired + private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; + + @Resource + private RedisTemplate redisTemplate; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(TransactionRecordMessage message) { - super.dispatchMessage(message); + log.info("交易记录-业务消息处理:{}",message); + message.setResult(JSONObject.toJSONString(message)); + // 持久化消息 + TransactionRecord transactionRecord = new TransactionRecord(); + BeanUtils.copyProperties(message,transactionRecord); + transactionRecord.setResult(message.getResult()); + transactionRecordService.create(transactionRecord); + + // 业务处理 + TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderRecord)){ + chargingOrderRecord.setTotalElectricity(message.getTotal_electricity()); + chargingOrderClient.updateChargingOrder(chargingOrderRecord); + } + //计算费用 + TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); + BeanUtils.copyProperties(message,vo); + int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); + if(200 != code){ + //失败后添加到队列中继续处理数据 + redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, message.getTransaction_serial_number()); + } + + // 添加实时上传记录结束记录 + // 查询mogondb上一条数据 + UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); + if(Objects.nonNull(data) && data.getStatus() != 5){ + UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); + BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); + uploadRealTimeMonitoringData.setStatus(5); + uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); + } } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java index 23eb6bb..eaab58b 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_update_balance_reply", topic = "charge_update_balance_reply", - selectorExpression = "update_balance_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "update_balance_reply" ) -public class UpdateBalanceReplyMessageListener extends EnhanceMessageHandler<UpdateBalanceReplyMessage> implements RocketMQListener<UpdateBalanceReplyMessage> { +public class UpdateBalanceReplyMessageListener implements RocketMQListener<UpdateBalanceReplyMessage> { @Autowired private UpdateBalanceReplyService updateBalanceReplyService; - @Override - protected void handleMessage(UpdateBalanceReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("余额更新应答-业务消息处理:{}",message); - // 持久化消息 - UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); - BeanUtils.copyProperties(message,updateBalanceReply); - updateBalanceReplyService.create(updateBalanceReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(UpdateBalanceReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(UpdateBalanceReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(UpdateBalanceReplyMessage message) { - super.dispatchMessage(message); + log.info("余额更新应答-业务消息处理:{}",message); + // 持久化消息 + UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); + BeanUtils.copyProperties(message,updateBalanceReply); + updateBalanceReplyService.create(updateBalanceReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java index f3eb7bf..0cb4875 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java @@ -10,8 +10,12 @@ import com.ruoyi.integration.api.feignClient.TCECClient; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData; +import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService; +import com.ruoyi.integration.rocket.model.ChargingMessage; +import com.ruoyi.integration.rocket.model.ChargingOrderMessage; import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage; +import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; @@ -36,10 +40,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_upload_real_time_monitoring_data", topic = "charge_upload_real_time_monitoring_data", - selectorExpression = "upload_real_time_monitoring_data", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "upload_real_time_monitoring_data" ) -public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { +public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @@ -48,129 +51,65 @@ private ChargingOrderClient chargingOrderClient; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; - @Resource - private ChargingGunClient chargingGunClient; - @Resource - private FaultMessageClient faultMessageClient; + @Autowired + private EnhanceProduce enhanceProduce; + + - @Resource - private TCECClient tcecClient; - - - @Override - protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("上传实时监测数据-业务消息处理:{}",message); - // 持久化消息 - UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); - BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); - // 查询mogondb上一条数据 - UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); - // 查询订单 - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); - // 查询当前时间段的计费策略 - TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); - uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); - uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); - if (Objects.nonNull(data)) { - uploadRealTimeMonitoringData.setLast_time(data.getLast_time()); - uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount())); - uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree())); - uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - }else { - log.info("首次上传实时监测数据"); - uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); - uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); - uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - } - uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); - // 业务处理 - UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); - BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); - chargingOrderClient.chargeMonitoring(query); - GetChargingGunByCode code = new GetChargingGunByCode(); - code.setCharging_pile_code(message.getCharging_pile_code()); - code.setCharging_gun_code(message.getCharging_gun_code()); - TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData(); - if(Objects.nonNull(chargingGun)){ - // 存储状态信息 - TFaultMessage faultMessage = new TFaultMessage(); - if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){ - faultMessage.setSiteId(chargingGun.getSiteId()); - faultMessage.setChargingPileId(chargingGun.getChargingPileId()); - faultMessage.setChargingGunId(chargingGun.getId()); - switch (message.getCharging_gun_status()){ - case 0: - faultMessage.setStatus(1); - chargingGun.setStatus(1); - break; - case 1: - faultMessage.setStatus(2); - chargingGun.setStatus(7); - break; - } - faultMessage.setDownTime(LocalDateTime.now()); - faultMessageClient.createFaultMessage(faultMessage); - }else { - switch (message.getCharging_gun_status()){ - case 2: - chargingGun.setStatus(2); - break; - case 3: - chargingGun.setStatus(4); - break; - } - // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录 - faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData(); - if(Objects.nonNull(faultMessage)){ - faultMessage.setEndTime(LocalDateTime.now()); - faultMessageClient.updateFaultMessage(faultMessage); - } - } - chargingGunClient.updateChargingGunById(chargingGun); - //推送状态给三方平台 - tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus()); - } - } - - @Override - protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(UploadRealTimeMonitoringDataMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(UploadRealTimeMonitoringDataMessage message) { - super.dispatchMessage(message); + try { + log.info("上传实时监测数据-业务消息处理:{}",message); + // 持久化消息 + UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); + BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); + // 查询mogondb上一条数据 + UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); + // 查询订单 + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); + // 查询当前时间段的计费策略 + TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); + uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); + uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); + if (Objects.nonNull(data)) { + uploadRealTimeMonitoringDataService.updateById(data.getId()); + uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount())); + uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree())); + uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + }else { + log.info("首次上传实时监测数据"); + uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); + uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); + uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + } + uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); + uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); + uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); + int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); + if(i == 0){ + log.error("数据存储mongo失败"); + } + + // 业务处理 + UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); + BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); + chargingOrderClient.chargeMonitoring(query); + // 订单id + ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); + chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); + // 推送充电订单信息 + ChargingMessage chargingMessage4 = new ChargingMessage(); + chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage4.setOrderMessage(chargingOrderMessage3); + enhanceProduce.orderInfoMessage(chargingMessage4); + } catch (Exception e) { + e.printStackTrace(); + } } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java index 3e4f3fb..b8a5ace 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java @@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_working_parameter_setting_reply", topic = "charge_working_parameter_setting_reply", - selectorExpression = "working_parameter_setting_reply", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + selectorExpression = "working_parameter_setting_reply" ) -public class WorkingParameterSettingReplyMessageListener extends EnhanceMessageHandler<WorkingParameterSettingReplyMessage> implements RocketMQListener<WorkingParameterSettingReplyMessage> { +public class WorkingParameterSettingReplyMessageListener implements RocketMQListener<WorkingParameterSettingReplyMessage> { @Autowired private WorkingParameterSettingReplyService workingParameterSettingReplyService; - @Override - protected void handleMessage(WorkingParameterSettingReplyMessage message) throws Exception { - // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 - log.info("充电桩工作参数设置应答-业务消息处理:{}",message); - // 持久化消息 - WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); - BeanUtils.copyProperties(message,workingParameterSettingReply); - workingParameterSettingReplyService.create(workingParameterSettingReply); - // 业务处理 - } - - @Override - protected void handleMaxRetriesExceeded(WorkingParameterSettingReplyMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(WorkingParameterSettingReplyMessage message) { - // 此处可做消息过滤 - return false; - } + + /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(WorkingParameterSettingReplyMessage message) { - super.dispatchMessage(message); + log.info("充电桩工作参数设置应答-业务消息处理:{}",message); + // 持久化消息 + WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); + BeanUtils.copyProperties(message,workingParameterSettingReply); + workingParameterSettingReplyService.create(workingParameterSettingReply); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java index f410000..36399fc 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java @@ -1,628 +1,625 @@ -package com.ruoyi.integration.rocket.produce; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; -import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; -import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; -import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient; -import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; -import com.ruoyi.chargingPile.api.model.TChargingGun; -import com.ruoyi.chargingPile.api.model.TFaultMessage; -import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode; -import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; -import com.ruoyi.common.redis.service.RedisService; -import com.ruoyi.integration.api.model.*; -import com.ruoyi.integration.drainage.TCECPushUtil; -import com.ruoyi.integration.iotda.constant.SendTagConstant; -import com.ruoyi.integration.iotda.enums.ServiceIdMenu; -import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; -import com.ruoyi.integration.mongodb.service.*; -import com.ruoyi.integration.rocket.model.*; -import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; -import com.ruoyi.order.api.feignClient.ChargingOrderClient; -import com.ruoyi.order.api.model.TChargingOrder; -import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; -import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; -import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; -import com.ruoyi.order.api.vo.SecurityDetectionVO; -import com.ruoyi.order.api.vo.TransactionRecordMessageVO; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.math.RoundingMode; -import java.time.LocalDateTime; -import java.util.Date; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Component -@RocketMQMessageListener( - messageModel = MessageModel.CLUSTERING, - consumerGroup = "charge_charging_message", - topic = "charge_charging_message", - selectorExpression = "charging_message", - consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 -) -public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { - - @Autowired - private AcquisitionBillingModeService acquisitionBillingModeService; - @Autowired - private BillingModeVerifyService billingModeVerifyService; - @Autowired - private BmsAbortService bmsAbortService; - @Resource - private ChargingOrderClient chargingOrderClient; - @Autowired - private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; - @Autowired - private OnlineService onlineService; - @Autowired - private PingService pingService; - @Autowired - private EndChargeService endChargeService; - @Autowired - private ErrorMessageMessageService errorMessageMessageService; - @Autowired - private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; - @Resource - private AccountingStrategyDetailClient accountingStrategyDetailClient; - @Autowired - private ChargingHandshakeService chargingHandshakeService; - @Autowired - private ParameterSettingService parameterSettingService; - @Autowired - private MotorAbortService motorAbortService; - @Autowired - private BmsInformationService bmsInformationService; - @Autowired - private ChargingPileStartsChargingService chargingPileStartsChargingService; - @Autowired - private PlatformStartChargingReplyService platformStartChargingReplyService; - @Autowired - private PlatformStopChargingReplyService platformStopChargingReplyService; - @Autowired - private TransactionRecordService transactionRecordService; - @Autowired - private UpdateBalanceReplyService updateBalanceReplyService; - @Autowired - private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; - @Autowired - private ClearOfflineCardReplyService clearOfflineCardReplyService; - @Autowired - private WorkingParameterSettingReplyService workingParameterSettingReplyService; - @Autowired - private TimingSettingService timingSettingService; - @Autowired - private SetupBillingModelReplyService setupBillingModelReplyService; - @Autowired - private GroundLockRealTimeDataService groundLockRealTimeDataService; - @Autowired - private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; - @Autowired - private PlatformRestartReplyService platformRestartReplyService; - @Autowired - private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; - @Autowired - private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; - @Autowired - private SecurityDetectionService securityDetectionService; - @Autowired - private TCECPushUtil tcecPushUtil; - - @Resource - private ChargingPileClient chargingPileClient; - @Resource - private ChargingGunClient chargingGunClient; - - @Resource - private RedisTemplate redisTemplate; - - @Autowired - private EnhanceProduce enhanceProduce; - - - - @StreamListener("input") - @Override - protected void handleMessage(ChargingMessage message) throws Exception { - log.info("rocket收到的消息内容:{}",message); - String serviceId = message.getServiceId(); - if(!StringUtils.hasLength(serviceId)){ - return; - } - log.info("rocket收到的消息内容:{} {}", serviceId,message); - switch (serviceId){ - case SendTagConstant.ONLINE: - OnlineMessage onlineMessage = message.getOnlineMessage(); - log.info("充电桩登录认证业务消息处理:{}",onlineMessage); - // 持久化消息 - Online online = new Online(); - BeanUtils.copyProperties(onlineMessage,online); - onlineService.create(online); - break; - case SendTagConstant.PING: - PingMessage pingMessage = message.getPingMessage(); - log.info("充电桩心跳包-业务消息处理:{}",pingMessage); - // 持久化消息 - Ping ping = new Ping(); - BeanUtils.copyProperties(pingMessage,ping); - pingService.save(ping); - //存储缓存中,5分钟有效 - redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor.execute(new Runnable() { - @Override - public void run() { - UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); - vo1.setGun_code(pingMessage.getCharging_gun_code()); - vo1.setPile_code(pingMessage.getCharging_pile_code()); - vo1.setStatus(pingMessage.getCharging_gun_status()); - chargingPileClient.updateChargingPileStatus(vo1); - } - }); - break; - case SendTagConstant.END_CHARGE: - EndChargeMessage endChargeMessage = message.getEndChargeMessage(); - log.info("充电结束-业务消息处理:{}",endChargeMessage); - // 持久化消息 - EndCharge endCharge = new EndCharge(); - BeanUtils.copyProperties(endChargeMessage,endCharge); - endChargeService.create(endCharge); - ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor1.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); - // 订单id - String transactionSerialNumber = endCharge.getTransaction_serial_number(); - ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); - chargingOrderMessage.setOrderNumber(transactionSerialNumber); - // 推送充电订单信息 - ChargingMessage chargingMessage1 = new ChargingMessage(); - chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); - chargingMessage1.setOrderMessage(chargingOrderMessage); - enhanceProduce.orderInfoMessage(chargingMessage1); - // 推送充电订单状态 - ChargingMessage chargingMessage2 = new ChargingMessage(); - chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); - chargingMessage2.setOrderMessage(chargingOrderMessage); - enhanceProduce.orderStatusMessage(chargingMessage2); -// try { -// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); -// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); -// }catch (Exception e){ -// e.printStackTrace(); -// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); +//package com.ruoyi.integration.rocket.produce; +// +//import com.alibaba.fastjson.JSON; +//import com.alibaba.fastjson.JSONObject; +//import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; +//import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; +//import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; +//import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient; +//import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +//import com.ruoyi.chargingPile.api.model.TChargingGun; +//import com.ruoyi.chargingPile.api.model.TFaultMessage; +//import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode; +//import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; +//import com.ruoyi.common.redis.service.RedisService; +//import com.ruoyi.integration.api.model.*; +//import com.ruoyi.integration.drainage.TCECPushUtil; +//import com.ruoyi.integration.iotda.constant.SendTagConstant; +//import com.ruoyi.integration.iotda.enums.ServiceIdMenu; +//import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; +//import com.ruoyi.integration.mongodb.service.*; +//import com.ruoyi.integration.rocket.model.*; +//import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; +//import com.ruoyi.order.api.feignClient.ChargingOrderClient; +//import com.ruoyi.order.api.model.TChargingOrder; +//import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; +//import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; +//import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; +//import com.ruoyi.order.api.vo.SecurityDetectionVO; +//import com.ruoyi.order.api.vo.TransactionRecordMessageVO; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.rocketmq.spring.annotation.MessageModel; +//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +//import org.apache.rocketmq.spring.core.RocketMQListener; +//import org.springframework.beans.BeanUtils; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.stereotype.Component; +//import org.springframework.util.StringUtils; +// +//import javax.annotation.Resource; +//import java.math.RoundingMode; +//import java.time.LocalDateTime; +//import java.util.Date; +//import java.util.Objects; +//import java.util.Set; +//import java.util.concurrent.LinkedBlockingQueue; +//import java.util.concurrent.ThreadPoolExecutor; +//import java.util.concurrent.TimeUnit; +// +//@Slf4j +//@Component +//@RocketMQMessageListener( +// messageModel = MessageModel.CLUSTERING, +// consumerGroup = "charge_charging_message", +// topic = "charge_charging_message", +// selectorExpression = "charging_message", +// consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 +//) +//public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { +// +// @Autowired +// private AcquisitionBillingModeService acquisitionBillingModeService; +// @Autowired +// private BillingModeVerifyService billingModeVerifyService; +// @Autowired +// private BmsAbortService bmsAbortService; +// @Resource +// private ChargingOrderClient chargingOrderClient; +// @Autowired +// private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; +// @Autowired +// private OnlineService onlineService; +// @Autowired +// private PingService pingService; +// @Autowired +// private EndChargeService endChargeService; +// @Autowired +// private ErrorMessageMessageService errorMessageMessageService; +// @Autowired +// private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; +// @Resource +// private AccountingStrategyDetailClient accountingStrategyDetailClient; +// @Autowired +// private ChargingHandshakeService chargingHandshakeService; +// @Autowired +// private ParameterSettingService parameterSettingService; +// @Autowired +// private MotorAbortService motorAbortService; +// @Autowired +// private BmsInformationService bmsInformationService; +// @Autowired +// private ChargingPileStartsChargingService chargingPileStartsChargingService; +// @Autowired +// private PlatformStartChargingReplyService platformStartChargingReplyService; +// @Autowired +// private PlatformStopChargingReplyService platformStopChargingReplyService; +// @Autowired +// private TransactionRecordService transactionRecordService; +// @Autowired +// private UpdateBalanceReplyService updateBalanceReplyService; +// @Autowired +// private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; +// @Autowired +// private ClearOfflineCardReplyService clearOfflineCardReplyService; +// @Autowired +// private WorkingParameterSettingReplyService workingParameterSettingReplyService; +// @Autowired +// private TimingSettingService timingSettingService; +// @Autowired +// private SetupBillingModelReplyService setupBillingModelReplyService; +// @Autowired +// private GroundLockRealTimeDataService groundLockRealTimeDataService; +// @Autowired +// private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; +// @Autowired +// private PlatformRestartReplyService platformRestartReplyService; +// @Autowired +// private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; +// @Autowired +// private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; +// @Autowired +// private SecurityDetectionService securityDetectionService; +// @Autowired +// private TCECPushUtil tcecPushUtil; +// +// @Resource +// private ChargingPileClient chargingPileClient; +// @Resource +// private ChargingGunClient chargingGunClient; +// +// @Resource +// private RedisTemplate redisTemplate; +// +// @Autowired +// private EnhanceProduce enhanceProduce; +// +// +// +// @Override +// protected void handleMessage(ChargingMessage message) throws Exception { +// log.info("rocket收到的消息内容:{}",message); +// String serviceId = message.getServiceId(); +// if(!StringUtils.hasLength(serviceId)){ +// return; +// } +// log.info("rocket收到的消息内容:{} {}", serviceId,message); +// switch (serviceId){ +// case SendTagConstant.ONLINE: +// OnlineMessage onlineMessage = message.getOnlineMessage(); +// log.info("充电桩登录认证业务消息处理:{}",onlineMessage); +// // 持久化消息 +// Online online = new Online(); +// BeanUtils.copyProperties(onlineMessage,online); +// onlineService.create(online); +// break; +// case SendTagConstant.PING: +// PingMessage pingMessage = message.getPingMessage(); +// log.info("充电桩心跳包-业务消息处理:{}",pingMessage); +// // 持久化消息 +// Ping ping = new Ping(); +// BeanUtils.copyProperties(pingMessage,ping); +// pingService.save(ping); +// //存储缓存中,5分钟有效 +// redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); +// ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor.execute(new Runnable() { +// @Override +// public void run() { +// UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); +// vo1.setGun_code(pingMessage.getCharging_gun_code()); +// vo1.setPile_code(pingMessage.getCharging_pile_code()); +// vo1.setStatus(pingMessage.getCharging_gun_status()); +// chargingPileClient.updateChargingPileStatus(vo1); +// } +// }); +// break; +// case SendTagConstant.END_CHARGE: +// EndChargeMessage endChargeMessage = message.getEndChargeMessage(); +// log.info("充电结束-业务消息处理:{}",endChargeMessage); +// // 持久化消息 +// EndCharge endCharge = new EndCharge(); +// BeanUtils.copyProperties(endChargeMessage,endCharge); +// endChargeService.create(endCharge); +// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor1.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); +// // 订单id +// String transactionSerialNumber = endCharge.getTransaction_serial_number(); +// ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); +// chargingOrderMessage.setOrderNumber(transactionSerialNumber); +// // 推送充电订单信息 +// ChargingMessage chargingMessage1 = new ChargingMessage(); +// chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); +// chargingMessage1.setOrderMessage(chargingOrderMessage); +// enhanceProduce.orderInfoMessage(chargingMessage1); +// // 推送充电订单状态 +// ChargingMessage chargingMessage2 = new ChargingMessage(); +// chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); +// chargingMessage2.setOrderMessage(chargingOrderMessage); +// enhanceProduce.orderStatusMessage(chargingMessage2); +//// try { +//// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); +//// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); +//// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); +//// }catch (Exception e){ +//// e.printStackTrace(); +//// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); +//// } +// } +// }); +// break; +// case SendTagConstant.ERROR_MESSAGE: +// ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); +// log.info("错误报文-业务消息处理:{}",errorMessageMessage1); +// // 持久化消息 +// ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); +// BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); +// errorMessageMessageService.create(errorMessageMessage); +// break; +// case SendTagConstant.BILLING_MODE_VERIFY: +// BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); +// log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); +// // 持久化消息 +// BillingModeVerify billingModeVerify = new BillingModeVerify(); +// BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); +// billingModeVerifyService.create(billingModeVerify); +// break; +// case SendTagConstant.ACQUISITION_BILLING_MODE: +// AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); +// log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); +// // 持久化消息 +// AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); +// BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); +// acquisitionBillingModeService.create(acquisitionBillingMode); +// break; +// case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: +// try { +// UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); +// log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); +// // 持久化消息 +// UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); +// BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); +// // 查询mogondb上一条数据 +// UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); +// // 查询订单 +// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); +// // 查询当前时间段的计费策略 +// TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); +// uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); +// uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); +// if (Objects.nonNull(data)) { +// uploadRealTimeMonitoringDataService.updateById(data.getId()); +// uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); +// uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); +// uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); +// }else { +// log.info("首次上传实时监测数据"); +// uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); +// uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); +// uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); +// } +// uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); +// uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); +// uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); +//// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); +//// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); +// int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); +// if(i == 0){ +// log.error("数据存储mongo失败"); +// } +// +// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor2.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); +// BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); +// chargingOrderClient.chargeMonitoring(query); +// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); +// ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); +// chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); +// chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); +// // 推送充电订单信息 +// ChargingMessage chargingMessage4 = new ChargingMessage(); +// chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); +// chargingMessage4.setOrderMessage(chargingOrderMessage3); +// enhanceProduce.orderInfoMessage(chargingMessage4); +//// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); // } - } - }); - break; - case SendTagConstant.ERROR_MESSAGE: - ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); - log.info("错误报文-业务消息处理:{}",errorMessageMessage1); - // 持久化消息 - ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); - BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); - errorMessageMessageService.create(errorMessageMessage); - break; - case SendTagConstant.BILLING_MODE_VERIFY: - BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); - log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); - // 持久化消息 - BillingModeVerify billingModeVerify = new BillingModeVerify(); - BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); - billingModeVerifyService.create(billingModeVerify); - break; - case SendTagConstant.ACQUISITION_BILLING_MODE: - AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); - log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); - // 持久化消息 - AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); - BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); - acquisitionBillingModeService.create(acquisitionBillingMode); - break; - case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: - try { - UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); - log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); - // 持久化消息 - UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); - BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); - // 查询mogondb上一条数据 - UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); - // 查询订单 - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); - // 查询当前时间段的计费策略 - TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); - uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); - uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); - if (Objects.nonNull(data)) { - uploadRealTimeMonitoringDataService.updateById(data.getId()); - uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); - uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); - uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - }else { - log.info("首次上传实时监测数据"); - uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); - uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); - uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - } - uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); - uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); - uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); -// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); -// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); - int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); - if(i == 0){ - log.error("数据存储mongo失败"); - } - - ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor2.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); - BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); - chargingOrderClient.chargeMonitoring(query); - chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); - chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); - // 推送充电订单信息 - ChargingMessage chargingMessage4 = new ChargingMessage(); - chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); - chargingMessage4.setOrderMessage(chargingOrderMessage3); - enhanceProduce.orderInfoMessage(chargingMessage4); -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - break; - case SendTagConstant.CHARGING_HANDSHAKE: - ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); - log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); - // 持久化消息 - ChargingHandshake chargingHandshake = new ChargingHandshake(); - BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); - chargingHandshakeService.create(chargingHandshake); - break; - case SendTagConstant.PARAMETER_SETTING: - ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); - log.info("业务消息处理:{}",parameterSettingMessage); - // 持久化消息 - ParameterSetting parameterSetting = new ParameterSetting(); - BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); - parameterSettingService.create(parameterSetting); - break; - case SendTagConstant.BMS_ABORT: - BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); - log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); - // 持久化消息 - BmsAbort bmsAbort = new BmsAbort(); - BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); - bmsAbortService.create(bmsAbort); - - ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor3.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); - } - }); - break; - case SendTagConstant.MOTOR_ABORT: - MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); - log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); - // 持久化消息 - MotorAbort motorAbort = new MotorAbort(); - BeanUtils.copyProperties(motorAbortMessage,motorAbort); - motorAbortService.create(motorAbort); - ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor4.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); - } - }); - break; - case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: - BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); - log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); - // 持久化消息 - BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); - BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); - bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); - ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor5.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderBms)){ - chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); - chargingOrderClient.updateChargingOrder(chargingOrderBms); - } - } - }); - break; - case SendTagConstant.BMS_INFORMATION: - BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); - log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); - // 持久化消息 - BmsInformation bmsInformation = new BmsInformation(); - BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); - bmsInformationService.create(bmsInformation); - break; - case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: - ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); - log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); - // 持久化消息 - ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); - BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); - chargingPileStartsChargingService.create(chargingPileStartsCharging); - break; - case SendTagConstant.PLATFORM_START_CHARGING_REPLY: - PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); - log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); - // 持久化消息 - PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); - BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); - platformStartChargingReplyService.create(platformStartChargingReply); - ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor6.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); - BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); - chargingOrderClient.startChargeSuccessfully(message1); - } - }); - break; - case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: - PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); - log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); - // 持久化消息 - PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); - BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); - platformStopChargingReplyService.create(platformStopChargingReply); - ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor7.execute(new Runnable() { - @Override - public void run() { - PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); - BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); - chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); - } - }); - break; - case SendTagConstant.TRANSACTION_RECORD: - TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); - log.info("交易记录-业务消息处理:{}",transactionRecordMessage); - transactionRecordMessage.setResult(JSONObject.toJSONString(message)); - // 持久化消息 - TransactionRecord transactionRecord = new TransactionRecord(); - BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); - transactionRecordService.create(transactionRecord); - ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor8.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderRecord)){ - chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); - chargingOrderClient.updateChargingOrder(chargingOrderRecord); - } - //计算费用 - TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); - BeanUtils.copyProperties(transactionRecordMessage,vo); - int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); - if(200 != code){ - //失败后添加到队列中继续处理数据 - redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); - } - } - }); - - - // 添加实时上传记录结束记录 - // 查询mogondb上一条数据 - UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); - if(Objects.nonNull(data) && data.getStatus() != 5){ - UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); - BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); - uploadRealTimeMonitoringData.setStatus(5); - uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); - } - break; - case SendTagConstant.UPDATE_BALANCE_REPLY: - UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); - log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); - // 持久化消息 - UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); - BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); - updateBalanceReplyService.create(updateBalanceReply); - break; - case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: - SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); - log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); - // 持久化消息 - SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); - BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); - synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); - break; - case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: - ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); - log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); - // 持久化消息 - ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); - BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); - clearOfflineCardReplyService.create(clearOfflineCardReply); - break; - case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: - WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); - log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); - // 持久化消息 - WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); - BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); - workingParameterSettingReplyService.create(workingParameterSettingReply); - break; - case SendTagConstant.TIMING_SETTING: - TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); - log.info("对时设置-业务消息处理:{}",timingSettingMessage); - // 持久化消息 - TimingSetting timingSetting = new TimingSetting(); - BeanUtils.copyProperties(timingSettingMessage,timingSetting); - timingSettingService.create(timingSetting); - break; - case SendTagConstant.SETUP_BILLING_MODEL_REPLY: - SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); - log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); - // 持久化消息 - SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); - BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); - setupBillingModelReplyService.create(setupBillingModelReply); - break; - case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: - GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); - log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); - // 持久化消息 - GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); - BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); - groundLockRealTimeDataService.create(groundLockRealTimeData); - break; - case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: - ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); - log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); - // 持久化消息 - ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); - BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); - chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); - break; - case SendTagConstant.PLATFORM_RESTART_REPLY: - PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); - log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); - // 持久化消息 - PlatformRestartReply platformRestartReply = new PlatformRestartReply(); - BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); - platformRestartReplyService.create(platformRestartReply); - break; - case SendTagConstant.QR_CODE_DELIVERY_REPLY: - QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); - log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); - QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); - BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); - qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); - break; - case SendTagConstant.SECURITY_DETECTION: - SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); - log.info("安全监测-业务消息处理:{}",securityDetectionMessage); - SecurityDetection securityDetection = new SecurityDetection(); - BeanUtils.copyProperties(securityDetectionMessage,securityDetection); - securityDetectionService.create(securityDetection); - ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor9.execute(new Runnable() { - @Override - public void run() { - SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); - BeanUtils.copyProperties(securityDetection, securityDetection1); - chargingOrderClient.securityDetection(securityDetection1); - } - }); - break; - default: - PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); - log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); - PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); - BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); - platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); - break; - } - } - - @Override - protected void handleMaxRetriesExceeded(ChargingMessage message) { - // 当超过指定重试次数消息时此处方法会被调用 - // 生产中可以进行回退或其他业务操作 - log.error("消息消费失败,请执行后续处理"); - } - - - /** - * 是否执行重试机制 - */ - @Override - protected boolean isRetry() { - return true; - } - - @Override - protected boolean throwException() { - // 是否抛出异常,false搭配retry自行处理异常 - return false; - } - - /** - * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 - * @param message 待处理消息 - * @return true: 本次消息被过滤,false:不过滤 - */ - @Override - protected boolean filter(ChargingMessage message) { - // 此处可做消息过滤 - return false; - } - - /** - * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 - */ - @Override - public void onMessage(ChargingMessage message) { - super.dispatchMessage(message); - } - - - /** - * 处理未正常完成费用计算的订单 - */ - public void transactionRecord(){ - Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD); - for (String member : members) { - TransactionRecord transactionRecord = transactionRecordService.findOne(member); - if(null == transactionRecord){ - redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); - }else{ - TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); - BeanUtils.copyProperties(transactionRecord, vo); - int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); - if(200 == code){ - redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); - } - } - } - } -} \ No newline at end of file +// }); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// break; +// case SendTagConstant.CHARGING_HANDSHAKE: +// ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); +// log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); +// // 持久化消息 +// ChargingHandshake chargingHandshake = new ChargingHandshake(); +// BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); +// chargingHandshakeService.create(chargingHandshake); +// break; +// case SendTagConstant.PARAMETER_SETTING: +// ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); +// log.info("业务消息处理:{}",parameterSettingMessage); +// // 持久化消息 +// ParameterSetting parameterSetting = new ParameterSetting(); +// BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); +// parameterSettingService.create(parameterSetting); +// break; +// case SendTagConstant.BMS_ABORT: +// BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); +// log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); +// // 持久化消息 +// BmsAbort bmsAbort = new BmsAbort(); +// BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); +// bmsAbortService.create(bmsAbort); +// +// ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor3.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); +// } +// }); +// break; +// case SendTagConstant.MOTOR_ABORT: +// MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); +// log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); +// // 持久化消息 +// MotorAbort motorAbort = new MotorAbort(); +// BeanUtils.copyProperties(motorAbortMessage,motorAbort); +// motorAbortService.create(motorAbort); +// ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor4.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); +// } +// }); +// break; +// case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: +// BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); +// log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); +// // 持久化消息 +// BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); +// BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); +// bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); +// ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor5.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); +// if(Objects.nonNull(chargingOrderBms)){ +// chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); +// chargingOrderClient.updateChargingOrder(chargingOrderBms); +// } +// } +// }); +// break; +// case SendTagConstant.BMS_INFORMATION: +// BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); +// log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); +// // 持久化消息 +// BmsInformation bmsInformation = new BmsInformation(); +// BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); +// bmsInformationService.create(bmsInformation); +// break; +// case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: +// ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); +// log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); +// // 持久化消息 +// ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); +// BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); +// chargingPileStartsChargingService.create(chargingPileStartsCharging); +// break; +// case SendTagConstant.PLATFORM_START_CHARGING_REPLY: +// PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); +// log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); +// // 持久化消息 +// PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); +// BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); +// platformStartChargingReplyService.create(platformStartChargingReply); +// ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor6.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); +// BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); +// chargingOrderClient.startChargeSuccessfully(message1); +// } +// }); +// break; +// case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: +// PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); +// log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); +// // 持久化消息 +// PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); +// BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); +// platformStopChargingReplyService.create(platformStopChargingReply); +// ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor7.execute(new Runnable() { +// @Override +// public void run() { +// PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); +// BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); +// chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); +// } +// }); +// break; +// case SendTagConstant.TRANSACTION_RECORD: +// TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); +// log.info("交易记录-业务消息处理:{}",transactionRecordMessage); +// transactionRecordMessage.setResult(JSONObject.toJSONString(message)); +// // 持久化消息 +// TransactionRecord transactionRecord = new TransactionRecord(); +// BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); +// transactionRecordService.create(transactionRecord); +// ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor8.execute(new Runnable() { +// @Override +// public void run() { +// // 业务处理 +// TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); +// if(Objects.nonNull(chargingOrderRecord)){ +// chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); +// chargingOrderClient.updateChargingOrder(chargingOrderRecord); +// } +// //计算费用 +// TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); +// BeanUtils.copyProperties(transactionRecordMessage,vo); +// int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); +// if(200 != code){ +// //失败后添加到队列中继续处理数据 +// redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); +// } +// } +// }); +// +// +// // 添加实时上传记录结束记录 +// // 查询mogondb上一条数据 +// UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); +// if(Objects.nonNull(data) && data.getStatus() != 5){ +// UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); +// BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); +// uploadRealTimeMonitoringData.setStatus(5); +// uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); +// } +// break; +// case SendTagConstant.UPDATE_BALANCE_REPLY: +// UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); +// log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); +// // 持久化消息 +// UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); +// BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); +// updateBalanceReplyService.create(updateBalanceReply); +// break; +// case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: +// SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); +// log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); +// // 持久化消息 +// SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); +// BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); +// synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); +// break; +// case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: +// ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); +// log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); +// // 持久化消息 +// ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); +// BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); +// clearOfflineCardReplyService.create(clearOfflineCardReply); +// break; +// case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: +// WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); +// log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); +// // 持久化消息 +// WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); +// BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); +// workingParameterSettingReplyService.create(workingParameterSettingReply); +// break; +// case SendTagConstant.TIMING_SETTING: +// TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); +// log.info("对时设置-业务消息处理:{}",timingSettingMessage); +// // 持久化消息 +// TimingSetting timingSetting = new TimingSetting(); +// BeanUtils.copyProperties(timingSettingMessage,timingSetting); +// timingSettingService.create(timingSetting); +// break; +// case SendTagConstant.SETUP_BILLING_MODEL_REPLY: +// SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); +// log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); +// // 持久化消息 +// SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); +// BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); +// setupBillingModelReplyService.create(setupBillingModelReply); +// break; +// case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: +// GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); +// log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); +// // 持久化消息 +// GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); +// BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); +// groundLockRealTimeDataService.create(groundLockRealTimeData); +// break; +// case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: +// ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); +// log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); +// // 持久化消息 +// ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); +// BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); +// chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); +// break; +// case SendTagConstant.PLATFORM_RESTART_REPLY: +// PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); +// log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); +// // 持久化消息 +// PlatformRestartReply platformRestartReply = new PlatformRestartReply(); +// BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); +// platformRestartReplyService.create(platformRestartReply); +// break; +// case SendTagConstant.QR_CODE_DELIVERY_REPLY: +// QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); +// log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); +// QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); +// BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); +// qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); +// break; +// case SendTagConstant.SECURITY_DETECTION: +// SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); +// log.info("安全监测-业务消息处理:{}",securityDetectionMessage); +// SecurityDetection securityDetection = new SecurityDetection(); +// BeanUtils.copyProperties(securityDetectionMessage,securityDetection); +// securityDetectionService.create(securityDetection); +// ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor9.execute(new Runnable() { +// @Override +// public void run() { +// SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); +// BeanUtils.copyProperties(securityDetection, securityDetection1); +// chargingOrderClient.securityDetection(securityDetection1); +// } +// }); +// break; +// default: +// PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); +// log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); +// PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); +// BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); +// platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); +// break; +// } +// } +// +// @Override +// protected void handleMaxRetriesExceeded(ChargingMessage message) { +// // 当超过指定重试次数消息时此处方法会被调用 +// // 生产中可以进行回退或其他业务操作 +// log.error("消息消费失败,请执行后续处理"); +// } +// +// +// /** +// * 是否执行重试机制 +// */ +// @Override +// protected boolean isRetry() { +// return true; +// } +// +// @Override +// protected boolean throwException() { +// // 是否抛出异常,false搭配retry自行处理异常 +// return false; +// } +// +// /** +// * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 +// * @param message 待处理消息 +// * @return true: 本次消息被过滤,false:不过滤 +// */ +// @Override +// protected boolean filter(ChargingMessage message) { +// // 此处可做消息过滤 +// return false; +// } +// +// /** +// * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 +// */ +// @Override +// public void onMessage(ChargingMessage message) { +// super.dispatchMessage(message); +// } +// +// +// /** +// * 处理未正常完成费用计算的订单 +// */ +// public void transactionRecord(){ +// Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD); +// for (String member : members) { +// TransactionRecord transactionRecord = transactionRecordService.findOne(member); +// if(null == transactionRecord){ +// redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); +// }else{ +// TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); +// BeanUtils.copyProperties(transactionRecord, vo); +// int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); +// if(200 == code){ +// redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); +// } +// } +// } +// } +//} \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java index 8072fab..aaab896 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java @@ -1,527 +1,522 @@ -package com.ruoyi.integration.rocket.produce; - -import com.alibaba.fastjson.JSONObject; -import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; -import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; -import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; -import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; -import com.ruoyi.chargingPile.api.model.TChargingGun; -import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; -import com.ruoyi.integration.api.model.*; -import com.ruoyi.integration.drainage.TCECPushUtil; -import com.ruoyi.integration.iotda.constant.SendTagConstant; -import com.ruoyi.integration.mongodb.service.*; -import com.ruoyi.integration.rocket.model.*; -import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; -import com.ruoyi.order.api.feignClient.ChargingOrderClient; -import com.ruoyi.order.api.model.TChargingOrder; -import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; -import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; -import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; -import com.ruoyi.order.api.vo.SecurityDetectionVO; -import com.ruoyi.order.api.vo.TransactionRecordMessageVO; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.math.RoundingMode; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - - -@Slf4j -@Component -public class ChargingMessageUtil { - - @Autowired - private AcquisitionBillingModeService acquisitionBillingModeService; - @Autowired - private BillingModeVerifyService billingModeVerifyService; - @Autowired - private BmsAbortService bmsAbortService; - @Resource - private ChargingOrderClient chargingOrderClient; - @Autowired - private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; - @Autowired - private OnlineService onlineService; - @Autowired - private PingService pingService; - @Autowired - private EndChargeService endChargeService; - @Autowired - private ErrorMessageMessageService errorMessageMessageService; - @Autowired - private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; - @Resource - private AccountingStrategyDetailClient accountingStrategyDetailClient; - @Autowired - private ChargingHandshakeService chargingHandshakeService; - @Autowired - private ParameterSettingService parameterSettingService; - @Autowired - private MotorAbortService motorAbortService; - @Autowired - private BmsInformationService bmsInformationService; - @Autowired - private ChargingPileStartsChargingService chargingPileStartsChargingService; - @Autowired - private PlatformStartChargingReplyService platformStartChargingReplyService; - @Autowired - private PlatformStopChargingReplyService platformStopChargingReplyService; - @Autowired - private TransactionRecordService transactionRecordService; - @Autowired - private UpdateBalanceReplyService updateBalanceReplyService; - @Autowired - private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; - @Autowired - private ClearOfflineCardReplyService clearOfflineCardReplyService; - @Autowired - private WorkingParameterSettingReplyService workingParameterSettingReplyService; - @Autowired - private TimingSettingService timingSettingService; - @Autowired - private SetupBillingModelReplyService setupBillingModelReplyService; - @Autowired - private GroundLockRealTimeDataService groundLockRealTimeDataService; - @Autowired - private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; - @Autowired - private PlatformRestartReplyService platformRestartReplyService; - @Autowired - private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; - @Autowired - private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; - @Autowired - private SecurityDetectionService securityDetectionService; - @Autowired - private TCECPushUtil tcecPushUtil; - - @Resource - private ChargingPileClient chargingPileClient; - @Resource - private ChargingGunClient chargingGunClient; - - @Resource - private RedisTemplate redisTemplate; - @Autowired - private EnhanceProduce enhanceProduce; - - - - - - public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ - log.info("rocket收到的消息内容:{}",message); - String serviceId = message.getServiceId(); - if(!StringUtils.hasLength(serviceId)){ - return; - } - log.info("rocket收到的消息内容:{} {}", serviceId,message); - switch (serviceId){ - case SendTagConstant.ONLINE: - OnlineMessage onlineMessage = message.getOnlineMessage(); - log.info("充电桩登录认证业务消息处理:{}",onlineMessage); - // 持久化消息 - Online online = new Online(); - BeanUtils.copyProperties(onlineMessage,online); - onlineService.create(online); - break; - case SendTagConstant.PING: - PingMessage pingMessage = message.getPingMessage(); - log.info("充电桩心跳包-业务消息处理:{}",pingMessage); - // 持久化消息 - Ping ping = new Ping(); - BeanUtils.copyProperties(pingMessage,ping); - pingService.save(ping); - //存储缓存中,5分钟有效 - redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); - - UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); - vo1.setGun_code(pingMessage.getCharging_gun_code()); - vo1.setPile_code(pingMessage.getCharging_pile_code()); - vo1.setStatus(pingMessage.getCharging_gun_status()); - chargingPileClient.updateChargingPileStatus(vo1); - // 监管平台推送充电设备状态 - SendResult sendResult; - String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); - ChargingMessage chargingMessage = new ChargingMessage(); - chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); - GunStatusMessage gunStatusMessage = new GunStatusMessage(); - gunStatusMessage.setFullNumber(gunCode); - chargingMessage.setGunStatusMessage(gunStatusMessage); - sendResult = enhanceProduce.gunStatusMessage(chargingMessage); - - break; - case SendTagConstant.END_CHARGE: - EndChargeMessage endChargeMessage = message.getEndChargeMessage(); - log.info("充电结束-业务消息处理:{}",endChargeMessage); - // 持久化消息 - EndCharge endCharge = new EndCharge(); - BeanUtils.copyProperties(endChargeMessage,endCharge); - endChargeService.create(endCharge); - // 业务处理 - chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); - // 订单id - String transactionSerialNumber = endCharge.getTransaction_serial_number(); - ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); - chargingOrderMessage.setOrderNumber(transactionSerialNumber); - // 推送充电订单信息 - ChargingMessage chargingMessage1 = new ChargingMessage(); - chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); - chargingMessage1.setOrderMessage(chargingOrderMessage); - enhanceProduce.orderInfoMessage(chargingMessage1); - // 推送充电订单状态 - ChargingMessage chargingMessage2 = new ChargingMessage(); - chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); - chargingMessage2.setOrderMessage(chargingOrderMessage); - enhanceProduce.orderStatusMessage(chargingMessage2); -// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); -// threadPoolExecutor1.execute(new Runnable() { +//package com.ruoyi.integration.rocket.produce; +// +//import com.alibaba.fastjson.JSONObject; +//import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; +//import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; +//import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; +//import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +//import com.ruoyi.chargingPile.api.model.TChargingGun; +//import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; +//import com.ruoyi.integration.api.model.*; +//import com.ruoyi.integration.drainage.TCECPushUtil; +//import com.ruoyi.integration.iotda.constant.SendTagConstant; +//import com.ruoyi.integration.mongodb.service.*; +//import com.ruoyi.integration.rocket.model.*; +//import com.ruoyi.order.api.feignClient.ChargingOrderClient; +//import com.ruoyi.order.api.model.TChargingOrder; +//import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; +//import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; +//import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; +//import com.ruoyi.order.api.vo.SecurityDetectionVO; +//import com.ruoyi.order.api.vo.TransactionRecordMessageVO; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.rocketmq.client.producer.SendResult; +//import org.springframework.beans.BeanUtils; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.stereotype.Component; +//import org.springframework.util.StringUtils; +// +//import javax.annotation.Resource; +//import java.math.RoundingMode; +//import java.util.Objects; +//import java.util.Set; +//import java.util.concurrent.LinkedBlockingQueue; +//import java.util.concurrent.ThreadPoolExecutor; +//import java.util.concurrent.TimeUnit; +// +// +//@Slf4j +//@Component +//public class ChargingMessageUtil { +// +// @Autowired +// private AcquisitionBillingModeService acquisitionBillingModeService; +// @Autowired +// private BillingModeVerifyService billingModeVerifyService; +// @Autowired +// private BmsAbortService bmsAbortService; +// @Resource +// private ChargingOrderClient chargingOrderClient; +// @Autowired +// private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; +// @Autowired +// private OnlineService onlineService; +// @Autowired +// private PingService pingService; +// @Autowired +// private EndChargeService endChargeService; +// @Autowired +// private ErrorMessageMessageService errorMessageMessageService; +// @Autowired +// private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; +// @Resource +// private AccountingStrategyDetailClient accountingStrategyDetailClient; +// @Autowired +// private ChargingHandshakeService chargingHandshakeService; +// @Autowired +// private ParameterSettingService parameterSettingService; +// @Autowired +// private MotorAbortService motorAbortService; +// @Autowired +// private BmsInformationService bmsInformationService; +// @Autowired +// private ChargingPileStartsChargingService chargingPileStartsChargingService; +// @Autowired +// private PlatformStartChargingReplyService platformStartChargingReplyService; +// @Autowired +// private PlatformStopChargingReplyService platformStopChargingReplyService; +// @Autowired +// private TransactionRecordService transactionRecordService; +// @Autowired +// private UpdateBalanceReplyService updateBalanceReplyService; +// @Autowired +// private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; +// @Autowired +// private ClearOfflineCardReplyService clearOfflineCardReplyService; +// @Autowired +// private WorkingParameterSettingReplyService workingParameterSettingReplyService; +// @Autowired +// private TimingSettingService timingSettingService; +// @Autowired +// private SetupBillingModelReplyService setupBillingModelReplyService; +// @Autowired +// private GroundLockRealTimeDataService groundLockRealTimeDataService; +// @Autowired +// private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; +// @Autowired +// private PlatformRestartReplyService platformRestartReplyService; +// @Autowired +// private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; +// @Autowired +// private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; +// @Autowired +// private SecurityDetectionService securityDetectionService; +// @Autowired +// private TCECPushUtil tcecPushUtil; +// +// @Resource +// private ChargingPileClient chargingPileClient; +// @Resource +// private ChargingGunClient chargingGunClient; +// +// @Resource +// private RedisTemplate redisTemplate; +// @Autowired +// private EnhanceProduce enhanceProduce; +// +// +// +// +// +// public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ +// log.info("rocket收到的消息内容:{}",message); +// String serviceId = message.getServiceId(); +// if(!StringUtils.hasLength(serviceId)){ +// return; +// } +// log.info("rocket收到的消息内容:{} {}", serviceId,message); +// switch (serviceId){ +// case SendTagConstant.ONLINE: +// OnlineMessage onlineMessage = message.getOnlineMessage(); +// log.info("充电桩登录认证业务消息处理:{}",onlineMessage); +// // 持久化消息 +// Online online = new Online(); +// BeanUtils.copyProperties(onlineMessage,online); +// onlineService.create(online); +// break; +// case SendTagConstant.PING: +// PingMessage pingMessage = message.getPingMessage(); +// log.info("充电桩心跳包-业务消息处理:{}",pingMessage); +// // 持久化消息 +// Ping ping = new Ping(); +// BeanUtils.copyProperties(pingMessage,ping); +// pingService.save(ping); +// //存储缓存中,5分钟有效 +// redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); +// +// UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); +// vo1.setGun_code(pingMessage.getCharging_gun_code()); +// vo1.setPile_code(pingMessage.getCharging_pile_code()); +// vo1.setStatus(pingMessage.getCharging_gun_status()); +// chargingPileClient.updateChargingPileStatus(vo1); +// // 监管平台推送充电设备状态 +// SendResult sendResult; +// String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); +// ChargingMessage chargingMessage = new ChargingMessage(); +// chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); +// GunStatusMessage gunStatusMessage = new GunStatusMessage(); +// gunStatusMessage.setFullNumber(gunCode); +// chargingMessage.setGunStatusMessage(gunStatusMessage); +// sendResult = enhanceProduce.gunStatusMessage(chargingMessage); +// +// break; +// case SendTagConstant.END_CHARGE: +// EndChargeMessage endChargeMessage = message.getEndChargeMessage(); +// log.info("充电结束-业务消息处理:{}",endChargeMessage); +// // 持久化消息 +// EndCharge endCharge = new EndCharge(); +// BeanUtils.copyProperties(endChargeMessage,endCharge); +// endChargeService.create(endCharge); +// // 业务处理 +// chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); +// // 订单id +// String transactionSerialNumber = endCharge.getTransaction_serial_number(); +// ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); +// chargingOrderMessage.setOrderNumber(transactionSerialNumber); +// // 推送充电订单信息 +// ChargingMessage chargingMessage1 = new ChargingMessage(); +// chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); +// chargingMessage1.setOrderMessage(chargingOrderMessage); +// enhanceProduce.orderInfoMessage(chargingMessage1); +// // 推送充电订单状态 +// ChargingMessage chargingMessage2 = new ChargingMessage(); +// chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); +// chargingMessage2.setOrderMessage(chargingOrderMessage); +// enhanceProduce.orderStatusMessage(chargingMessage2); +//// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +//// threadPoolExecutor1.execute(new Runnable() { +//// @Override +//// public void run() { +//// try { +//// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); +//// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); +//// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); +//// }catch (Exception e){ +//// e.printStackTrace(); +//// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); +//// } +//// } +//// }); +// break; +// case SendTagConstant.ERROR_MESSAGE: +// ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); +// log.info("错误报文-业务消息处理:{}",errorMessageMessage1); +// // 持久化消息 +// ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); +// BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); +// errorMessageMessageService.create(errorMessageMessage); +// break; +// case SendTagConstant.BILLING_MODE_VERIFY: +// BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); +// log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); +// // 持久化消息 +// BillingModeVerify billingModeVerify = new BillingModeVerify(); +// BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); +// billingModeVerifyService.create(billingModeVerify); +// break; +// case SendTagConstant.ACQUISITION_BILLING_MODE: +// AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); +// log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); +// // 持久化消息 +// AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); +// BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); +// acquisitionBillingModeService.create(acquisitionBillingMode); +// break; +// case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: +// try { +// UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); +// log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); +// // 持久化消息 +// UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); +// BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); +// // 查询mogondb上一条数据 +// UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); +// // 查询订单 +// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); +// // 查询当前时间段的计费策略 +// TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); +// uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); +// uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); +// if (Objects.nonNull(data)) { +// uploadRealTimeMonitoringDataService.updateById(data.getId()); +// uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); +// uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); +// uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); +// }else { +// log.info("首次上传实时监测数据"); +// uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); +// uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); +// uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); +// } +// uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); +// uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); +// uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); +//// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); +//// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); +// int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); +// if(i == 0){ +// log.error("数据存储mongo失败"); +// } +// +// // 业务处理 +// UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); +// BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); +// chargingOrderClient.chargeMonitoring(query); +// // 订单id +// ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); +// chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); +// // 推送充电订单信息 +// ChargingMessage chargingMessage4 = new ChargingMessage(); +// chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); +// chargingMessage4.setOrderMessage(chargingOrderMessage3); +// enhanceProduce.orderInfoMessage(chargingMessage4); +//// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +//// threadPoolExecutor2.execute(new Runnable() { +//// @Override +//// public void run() { +//// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); +//// +//// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); +//// } +//// }); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// break; +// case SendTagConstant.CHARGING_HANDSHAKE: +// ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); +// log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); +// // 持久化消息 +// ChargingHandshake chargingHandshake = new ChargingHandshake(); +// BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); +// chargingHandshakeService.create(chargingHandshake); +// break; +// case SendTagConstant.PARAMETER_SETTING: +// ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); +// log.info("业务消息处理:{}",parameterSettingMessage); +// // 持久化消息 +// ParameterSetting parameterSetting = new ParameterSetting(); +// BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); +// parameterSettingService.create(parameterSetting); +// break; +// case SendTagConstant.BMS_ABORT: +// BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); +// log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); +// // 持久化消息 +// BmsAbort bmsAbort = new BmsAbort(); +// BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); +// bmsAbortService.create(bmsAbort); +// +// ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor3.execute(new Runnable() { // @Override // public void run() { -// try { -// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); -// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); -// }catch (Exception e){ -// e.printStackTrace(); -// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); -// } +// // 业务处理 +// chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); // } // }); - break; - case SendTagConstant.ERROR_MESSAGE: - ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); - log.info("错误报文-业务消息处理:{}",errorMessageMessage1); - // 持久化消息 - ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); - BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); - errorMessageMessageService.create(errorMessageMessage); - break; - case SendTagConstant.BILLING_MODE_VERIFY: - BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); - log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); - // 持久化消息 - BillingModeVerify billingModeVerify = new BillingModeVerify(); - BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); - billingModeVerifyService.create(billingModeVerify); - break; - case SendTagConstant.ACQUISITION_BILLING_MODE: - AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); - log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); - // 持久化消息 - AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); - BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); - acquisitionBillingModeService.create(acquisitionBillingMode); - break; - case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: - try { - UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); - log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); - // 持久化消息 - UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); - BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); - // 查询mogondb上一条数据 - UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); - // 查询订单 - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); - // 查询当前时间段的计费策略 - TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); - uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); - uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); - if (Objects.nonNull(data)) { - uploadRealTimeMonitoringDataService.updateById(data.getId()); - uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); - uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); - uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - }else { - log.info("首次上传实时监测数据"); - uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); - uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); - uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); - } - uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); - uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); - uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); -// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); -// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); - int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); - if(i == 0){ - log.error("数据存储mongo失败"); - } - - // 业务处理 - UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); - BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); - chargingOrderClient.chargeMonitoring(query); - // 订单id - ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); - chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); - // 推送充电订单信息 - ChargingMessage chargingMessage4 = new ChargingMessage(); - chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); - chargingMessage4.setOrderMessage(chargingOrderMessage3); - enhanceProduce.orderInfoMessage(chargingMessage4); -// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); -// threadPoolExecutor2.execute(new Runnable() { -// @Override -// public void run() { -// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); +// break; +// case SendTagConstant.MOTOR_ABORT: +// MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); +// log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); +// // 持久化消息 +// MotorAbort motorAbort = new MotorAbort(); +// BeanUtils.copyProperties(motorAbortMessage,motorAbort); +// motorAbortService.create(motorAbort); +// // 业务处理 +// chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); +// break; +// case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: +// BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); +// log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); +// // 持久化消息 +// BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); +// BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); +// bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); -// } -// }); - } catch (Exception e) { - e.printStackTrace(); - } - break; - case SendTagConstant.CHARGING_HANDSHAKE: - ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); - log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); - // 持久化消息 - ChargingHandshake chargingHandshake = new ChargingHandshake(); - BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); - chargingHandshakeService.create(chargingHandshake); - break; - case SendTagConstant.PARAMETER_SETTING: - ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); - log.info("业务消息处理:{}",parameterSettingMessage); - // 持久化消息 - ParameterSetting parameterSetting = new ParameterSetting(); - BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); - parameterSettingService.create(parameterSetting); - break; - case SendTagConstant.BMS_ABORT: - BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); - log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); - // 持久化消息 - BmsAbort bmsAbort = new BmsAbort(); - BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); - bmsAbortService.create(bmsAbort); - - ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor3.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); - } - }); - break; - case SendTagConstant.MOTOR_ABORT: - MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); - log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); - // 持久化消息 - MotorAbort motorAbort = new MotorAbort(); - BeanUtils.copyProperties(motorAbortMessage,motorAbort); - motorAbortService.create(motorAbort); - // 业务处理 - chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); - break; - case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: - BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); - log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); - // 持久化消息 - BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); - BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); - bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); - - // 业务处理 - TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderBms)){ - chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); - chargingOrderClient.updateChargingOrder(chargingOrderBms); - } - break; - case SendTagConstant.BMS_INFORMATION: - BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); - log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); - // 持久化消息 - BmsInformation bmsInformation = new BmsInformation(); - BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); - bmsInformationService.create(bmsInformation); - break; - case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: - ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); - log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); - // 持久化消息 - ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); - BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); - chargingPileStartsChargingService.create(chargingPileStartsCharging); - break; - case SendTagConstant.PLATFORM_START_CHARGING_REPLY: - PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); - log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); - // 持久化消息 - PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); - BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); - platformStartChargingReplyService.create(platformStartChargingReply); - - // 业务处理 - PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); - BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); - chargingOrderClient.startChargeSuccessfully(message1); - break; - case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: - PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); - log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); - // 持久化消息 - PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); - BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); - platformStopChargingReplyService.create(platformStopChargingReply); - - PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); - BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); - chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); - break; - case SendTagConstant.TRANSACTION_RECORD: - TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); - log.info("交易记录-业务消息处理:{}",transactionRecordMessage); - transactionRecordMessage.setResult(JSONObject.toJSONString(message)); - // 持久化消息 - TransactionRecord transactionRecord = new TransactionRecord(); - BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); - transactionRecordService.create(transactionRecord); - - // 业务处理 - TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderRecord)){ - chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); - chargingOrderClient.updateChargingOrder(chargingOrderRecord); - } - //计算费用 - TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); - BeanUtils.copyProperties(transactionRecordMessage,vo); - int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); - if(200 != code){ - //失败后添加到队列中继续处理数据 - redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); - } - - - // 添加实时上传记录结束记录 - // 查询mogondb上一条数据 - UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); - if(Objects.nonNull(data) && data.getStatus() != 5){ - UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); - BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); - uploadRealTimeMonitoringData.setStatus(5); - uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); - } - break; - case SendTagConstant.UPDATE_BALANCE_REPLY: - UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); - log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); - // 持久化消息 - UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); - BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); - updateBalanceReplyService.create(updateBalanceReply); - break; - case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: - SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); - log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); - // 持久化消息 - SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); - BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); - synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); - break; - case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: - ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); - log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); - // 持久化消息 - ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); - BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); - clearOfflineCardReplyService.create(clearOfflineCardReply); - break; - case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: - WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); - log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); - // 持久化消息 - WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); - BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); - workingParameterSettingReplyService.create(workingParameterSettingReply); - break; - case SendTagConstant.TIMING_SETTING: - TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); - log.info("对时设置-业务消息处理:{}",timingSettingMessage); - // 持久化消息 - TimingSetting timingSetting = new TimingSetting(); - BeanUtils.copyProperties(timingSettingMessage,timingSetting); - timingSettingService.create(timingSetting); - break; - case SendTagConstant.SETUP_BILLING_MODEL_REPLY: - SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); - log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); - // 持久化消息 - SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); - BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); - setupBillingModelReplyService.create(setupBillingModelReply); - break; - case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: - GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); - log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); - // 持久化消息 - GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); - BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); - groundLockRealTimeDataService.create(groundLockRealTimeData); - break; - case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: - ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); - log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); - // 持久化消息 - ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); - BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); - chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); - break; - case SendTagConstant.PLATFORM_RESTART_REPLY: - PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); - log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); - // 持久化消息 - PlatformRestartReply platformRestartReply = new PlatformRestartReply(); - BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); - platformRestartReplyService.create(platformRestartReply); - break; - case SendTagConstant.QR_CODE_DELIVERY_REPLY: - QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); - log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); - QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); - BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); - qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); - break; - case SendTagConstant.SECURITY_DETECTION: - SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); - log.info("安全监测-业务消息处理:{}",securityDetectionMessage); - SecurityDetection securityDetection = new SecurityDetection(); - BeanUtils.copyProperties(securityDetectionMessage,securityDetection); - securityDetectionService.create(securityDetection); - - SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); - BeanUtils.copyProperties(securityDetection, securityDetection1); - chargingOrderClient.securityDetection(securityDetection1); - break; - default: - PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); - log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); - PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); - BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); - platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); - break; - } - } - - -} \ No newline at end of file +// // 业务处理 +// TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); +// if(Objects.nonNull(chargingOrderBms)){ +// chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); +// chargingOrderClient.updateChargingOrder(chargingOrderBms); +// } +// break; +// case SendTagConstant.BMS_INFORMATION: +// BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); +// log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); +// // 持久化消息 +// BmsInformation bmsInformation = new BmsInformation(); +// BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); +// bmsInformationService.create(bmsInformation); +// break; +// case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: +// ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); +// log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); +// // 持久化消息 +// ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); +// BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); +// chargingPileStartsChargingService.create(chargingPileStartsCharging); +// break; +// case SendTagConstant.PLATFORM_START_CHARGING_REPLY: +// PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); +// log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); +// // 持久化消息 +// PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); +// BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); +// platformStartChargingReplyService.create(platformStartChargingReply); +// +// // 业务处理 +// PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); +// BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); +// chargingOrderClient.startChargeSuccessfully(message1); +// break; +// case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: +// PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); +// log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); +// // 持久化消息 +// PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); +// BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); +// platformStopChargingReplyService.create(platformStopChargingReply); +// +// PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); +// BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); +// chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); +// break; +// case SendTagConstant.TRANSACTION_RECORD: +// TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); +// log.info("交易记录-业务消息处理:{}",transactionRecordMessage); +// transactionRecordMessage.setResult(JSONObject.toJSONString(message)); +// // 持久化消息 +// TransactionRecord transactionRecord = new TransactionRecord(); +// BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); +// transactionRecordService.create(transactionRecord); +// +// // 业务处理 +// TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); +// if(Objects.nonNull(chargingOrderRecord)){ +// chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); +// chargingOrderClient.updateChargingOrder(chargingOrderRecord); +// } +// //计算费用 +// TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); +// BeanUtils.copyProperties(transactionRecordMessage,vo); +// int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); +// if(200 != code){ +// //失败后添加到队列中继续处理数据 +// redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); +// } +// +// +// // 添加实时上传记录结束记录 +// // 查询mogondb上一条数据 +// UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); +// if(Objects.nonNull(data) && data.getStatus() != 5){ +// UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); +// BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); +// uploadRealTimeMonitoringData.setStatus(5); +// uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); +// } +// break; +// case SendTagConstant.UPDATE_BALANCE_REPLY: +// UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); +// log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); +// // 持久化消息 +// UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); +// BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); +// updateBalanceReplyService.create(updateBalanceReply); +// break; +// case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: +// SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); +// log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); +// // 持久化消息 +// SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); +// BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); +// synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); +// break; +// case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: +// ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); +// log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); +// // 持久化消息 +// ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); +// BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); +// clearOfflineCardReplyService.create(clearOfflineCardReply); +// break; +// case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: +// WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); +// log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); +// // 持久化消息 +// WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); +// BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); +// workingParameterSettingReplyService.create(workingParameterSettingReply); +// break; +// case SendTagConstant.TIMING_SETTING: +// TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); +// log.info("对时设置-业务消息处理:{}",timingSettingMessage); +// // 持久化消息 +// TimingSetting timingSetting = new TimingSetting(); +// BeanUtils.copyProperties(timingSettingMessage,timingSetting); +// timingSettingService.create(timingSetting); +// break; +// case SendTagConstant.SETUP_BILLING_MODEL_REPLY: +// SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); +// log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); +// // 持久化消息 +// SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); +// BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); +// setupBillingModelReplyService.create(setupBillingModelReply); +// break; +// case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: +// GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); +// log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); +// // 持久化消息 +// GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); +// BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); +// groundLockRealTimeDataService.create(groundLockRealTimeData); +// break; +// case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: +// ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); +// log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); +// // 持久化消息 +// ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); +// BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); +// chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); +// break; +// case SendTagConstant.PLATFORM_RESTART_REPLY: +// PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); +// log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); +// // 持久化消息 +// PlatformRestartReply platformRestartReply = new PlatformRestartReply(); +// BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); +// platformRestartReplyService.create(platformRestartReply); +// break; +// case SendTagConstant.QR_CODE_DELIVERY_REPLY: +// QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); +// log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); +// QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); +// BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); +// qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); +// break; +// case SendTagConstant.SECURITY_DETECTION: +// SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); +// log.info("安全监测-业务消息处理:{}",securityDetectionMessage); +// SecurityDetection securityDetection = new SecurityDetection(); +// BeanUtils.copyProperties(securityDetectionMessage,securityDetection); +// securityDetectionService.create(securityDetection); +// +// SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); +// BeanUtils.copyProperties(securityDetection, securityDetection1); +// chargingOrderClient.securityDetection(securityDetection1); +// break; +// default: +// PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); +// log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); +// PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); +// BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); +// platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); +// break; +// } +// } +// +// +//} \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java index f54f22e..9684685 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java @@ -64,7 +64,7 @@ Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 - log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } diff --git a/ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java b/ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java index c4bcc6b..3d74e3e 100644 --- a/ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java +++ b/ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java @@ -30,10 +30,12 @@ import com.ruoyi.other.api.domain.Operator; import com.ruoyi.other.api.feignClient.OperatorClient; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cglib.core.Local; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; import javax.annotation.Resource; @@ -50,395 +52,5 @@ @Slf4j @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RuoYiIntegrationApplication.class) public class RuoYiIntegrationApplicationTests { - @Resource - private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; - - @Resource - private ParkingOrderService parkingOrderService; - @Resource - private SiteClient siteClient; - @Resource - private ChargingGunClient chargingGunClient; - @Resource - private ChargingOrderClient chargingOrderClient; - - @Resource - private OperatorClient operatorClient; - @Test - public void test(){ - /** - * 推送充电设备接口状态信息 - * @param chargingGun - * @return - */ - List<TChargingGun> data = chargingGunClient.getAllGun().getData(); - TChargingGun chargingGun = data.stream().filter(e -> e.getId() == 61).findFirst().orElse(new TChargingGun()); - ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo(); - connectorStatusInfo.setOperatorID("906171535"); - connectorStatusInfo.setEquipmentOwnerID("906171535"); - connectorStatusInfo.setStationID(String.valueOf(chargingGun.getSiteId())); - connectorStatusInfo.setEquipmentID(String.valueOf(chargingGun.getChargingPileId())); - connectorStatusInfo.setConnectorID(chargingGun.getFullNumber()); - connectorStatusInfo.setEquipmentClassification(1); - switch (chargingGun.getStatus()){ - case 1: - connectorStatusInfo.setStatus(0); - break; - case 2: - connectorStatusInfo.setStatus(1); - break; - case 3: - connectorStatusInfo.setStatus(2); - break; - case 4: - connectorStatusInfo.setStatus(3); - break; - case 5: - connectorStatusInfo.setStatus(3); - break; - case 6: - connectorStatusInfo.setStatus(4); - break; - case 7: - connectorStatusInfo.setStatus(255); - break; - } - connectorStatusInfo.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); -// List<Operator> operators = operatorClient.getAllOperator().getData(); -// for (Operator operator : operators) { - tcecSuperviseUtil.notificationStationStatus(new Operator(), connectorStatusInfo); -// } - } - @Autowired - private TCECSuperviseUtil tcecSuperviseUtil; - private final static String operatorId = "906171535"; - - @Test - public void test1(){ - TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData(); - SupEquipChargeStatus supEquipChargeStatus = new SupEquipChargeStatus(); - supEquipChargeStatus.setOperatorID(operatorId); - supEquipChargeStatus.setEquipmentOwnerID(operatorId); - supEquipChargeStatus.setStationID(String.valueOf(chargingOrder.getSiteId())); - supEquipChargeStatus.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId())); - supEquipChargeStatus.setOrderNo(operatorId+chargingOrder.getCode()); - switch (chargingOrder.getStatus()){ - case 2: - supEquipChargeStatus.setConnectorStatus(1); - break; - case 3: - supEquipChargeStatus.setConnectorStatus(2); - break; - case 4: - supEquipChargeStatus.setConnectorStatus(3); - break; - case 5: - supEquipChargeStatus.setConnectorStatus(4); - break; - } - TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); - supEquipChargeStatus.setConnectorID(chargingGun.getFullNumber()); - supEquipChargeStatus.setEquipmentClassification(1); - supEquipChargeStatus.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - switch (chargingGun.getStatus()){ - case 1: - supEquipChargeStatus.setConnectorStatus(0); - break; - case 2: - supEquipChargeStatus.setConnectorStatus(1); - break; - case 3: - supEquipChargeStatus.setConnectorStatus(2); - break; - case 4: - supEquipChargeStatus.setConnectorStatus(3); - break; - case 5: - supEquipChargeStatus.setConnectorStatus(3); - break; - case 6: - supEquipChargeStatus.setConnectorStatus(4); - break; - case 7: - supEquipChargeStatus.setConnectorStatus(255); - break; - } - supEquipChargeStatus.setCurrentA(chargingOrder.getCurrent()); - supEquipChargeStatus.setSOC(StringUtils.hasLength(chargingOrder.getEndSoc())?new BigDecimal(chargingOrder.getEndSoc()):new BigDecimal("1")); - supEquipChargeStatus.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); - supEquipChargeStatus.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - supEquipChargeStatus.setTotalPower(chargingOrder.getElectrovalence()); - tcecSuperviseUtil.notificationSupEquipChargeStatus(new Operator(), supEquipChargeStatus); - } - @Test - public void test2(){ - TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData(); - SupChargeOrderInfo supChargeOrderInfo = new SupChargeOrderInfo(); - supChargeOrderInfo.setOperatorID(operatorId); - supChargeOrderInfo.setEquipmentOwnerID(operatorId); - supChargeOrderInfo.setStationID(String.valueOf(chargingOrder.getSiteId())); - supChargeOrderInfo.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId())); - supChargeOrderInfo.setOrderNo(operatorId+chargingOrder.getCode()); - TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); - supChargeOrderInfo.setConnectorID(chargingGun.getFullNumber()); - supChargeOrderInfo.setEquipmentClassification(1); - supChargeOrderInfo.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - supChargeOrderInfo.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); - supChargeOrderInfo.setEndTime(chargingOrder.getEndTime() != null ? chargingOrder.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); - supChargeOrderInfo.setTotalPower(chargingOrder.getElectrovalence()); - supChargeOrderInfo.setTotalElecMoney(chargingOrder.getElectrovalence()); - supChargeOrderInfo.setTotalServiceMoney(chargingOrder.getServiceCharge()); - supChargeOrderInfo.setTotalMoney(chargingOrder.getOrderAmount()); - supChargeOrderInfo.setOrderStatus(chargingOrder.getStatus()); - switch (chargingOrder.getEndMode()){ - case 0: - supChargeOrderInfo.setStopReason(5); - supChargeOrderInfo.setStopDesc("异常终止"); - break; - case 1: - supChargeOrderInfo.setStopReason(0); - supChargeOrderInfo.setStopDesc("用户手动停止充电"); - break; - case 2: - supChargeOrderInfo.setStopReason(1); - supChargeOrderInfo.setStopDesc("客户归属地运营商平台停止充电"); - break; - case 3: - supChargeOrderInfo.setStopReason(1); - supChargeOrderInfo.setStopDesc("费用不足中止"); - break; - } - tcecSuperviseUtil.notificationChargeOrderInfo(new Operator(), supChargeOrderInfo); - } - @Test - public void test3(){ - StationStatsInfoResult res = new StationStatsInfoResult(); - List<Site> data = siteClient.getSiteAll().getData(); - LocalDateTime now = LocalDateTime.now(); - LocalDateTime startLocalDateTime = now.minusDays(1); - LocalDateTime endLocalDateTime = now.minusDays(1); - LocalDateTime localDateTime1 = LocalDateTime.of(2025, 2, 3, 0, 0, 0); - LocalDateTime localDateTime2 = LocalDateTime.of(2025, 2, 3, 23, 59, 59); - - - // 获取今天凌晨 - startLocalDateTime.withHour(0); - startLocalDateTime.withMinute(0); - startLocalDateTime.withSecond(0); - startLocalDateTime.withMonth(1); - startLocalDateTime.withDayOfMonth(28); - String start = DateUtils.localDateTimeToString(startLocalDateTime); - endLocalDateTime.withHour(23); - endLocalDateTime.withMinute(59); - endLocalDateTime.withSecond(59); - startLocalDateTime.withMonth(1); - startLocalDateTime.withDayOfMonth(28); - String end = DateUtils.localDateTimeToString(endLocalDateTime); - ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO(); - chargingStatisticeDTO.setStartTime(localDateTime1); - chargingStatisticeDTO.setEndTime(localDateTime2); - List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData(); - List<StationStatsInfo> stationStatsInfos = new ArrayList<>(); - String start1 = DateUtils.localDateTimeToString(localDateTime1); - String start2 = DateUtils.localDateTimeToString(localDateTime2); - - for (Site datum : data) { - StationStatsInfo stationStatsInfo = new StationStatsInfo(); - stationStatsInfo.setStationID(datum.getId().toString()); - stationStatsInfo.setEquipmentOwnerID("906171535"); - stationStatsInfo.setOperatorID("906171535"); - stationStatsInfo.setStationClassification(1); - stationStatsInfo.setStartTime(start1); - stationStatsInfo.setEndTime(start2); - List<TChargingOrder> chargingOrders = data1.stream().filter(e -> e.getSiteId().equals(datum.getId())).collect(Collectors.toList()); - // 充电电量 - BigDecimal electricity = new BigDecimal("0"); - int chargingCount = 0; - for (TChargingOrder chargingOrder : chargingOrders) { - if (chargingOrder.getElectricity()!=null){ - electricity = electricity.add(chargingOrder.getElectricity()); - chargingCount++; - } - } - stationStatsInfo.setStationElectricity(electricity.divide(new BigDecimal("24"),4, BigDecimal.ROUND_DOWN)); - stationStatsInfo.setStationTotalChargeEnergy(electricity.setScale(4, BigDecimal.ROUND_DOWN)); - stationStatsInfo.setStationTotalWarningNum(0); - stationStatsInfo.setStationTotalOtherEnergy(new BigDecimal("0")); - stationStatsInfo.setStationTotalChargeNum(chargingCount); - //构建设备统计数据 - List<EquipmentStatsInfo> EquipmentStatsInfos = new ArrayList<>(); - Map<Integer, List<TChargingOrder>> collect = chargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingPileId)); - for (Integer integer : collect.keySet()) { - List<TChargingOrder> tChargingOrders = collect.get(integer); - BigDecimal reduce1 = tChargingOrders.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add); - EquipmentStatsInfo equipmentStatsInfo = new EquipmentStatsInfo(); - equipmentStatsInfo.setEquipmentClassification(1); - long chargingTime = 0L; - for (TChargingOrder tChargingOrder : tChargingOrders) { - // 累加充电时长 - LocalDateTime startTime = tChargingOrder.getStartTime(); - LocalDateTime endTime = tChargingOrder.getEndTime(); - // 计算时间差 单位分钟 - chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60; - } - equipmentStatsInfo.setEquipmentTotalChargeTime(chargingTime); - equipmentStatsInfo.setEquipmentTotalChargeNum(tChargingOrders.size()); - equipmentStatsInfo.setEquipmentTotalWarningNum(0); - equipmentStatsInfo.setEquipmentID(integer.toString()); - equipmentStatsInfo.setEquipmentElectricity(reduce1); - //构建设备接口统计数据 - Map<Integer, List<TChargingOrder>> collect2 = tChargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingGunId)); - List<ConnectorStatsInfo> ConnectorStatsInfos = new ArrayList<>(); - for (Integer integer1 : collect2.keySet()) { - List<TChargingOrder> tChargingOrders1 = collect2.get(integer1); - BigDecimal reduce2 = tChargingOrders1.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add); - long chargingTime1 = 0L; - for (TChargingOrder chargingOrder : tChargingOrders1) { - // 累加充电时长 - LocalDateTime startTime = chargingOrder.getStartTime(); - LocalDateTime endTime = chargingOrder.getEndTime(); - // 计算时间差 单位分钟 - chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60; - } - TChargingGun chargingGun = chargingGunClient.getChargingGunById(integer1).getData(); - ConnectorStatsInfo connectorStatsInfo = new ConnectorStatsInfo(); - connectorStatsInfo.setConnectorID(chargingGun.getFullNumber()); - connectorStatsInfo.setConnectorElectricity(reduce2); - connectorStatsInfo.setConnectorTotalChargeTime(Integer.valueOf(chargingTime+"")); - connectorStatsInfo.setConnectorTotalChargeNum(tChargingOrders1.size()); - connectorStatsInfo.setConnectorTotalWarningNum(0); - ConnectorStatsInfos.add(connectorStatsInfo); - } - equipmentStatsInfo.setConnectorStatsInfos(ConnectorStatsInfos); - EquipmentStatsInfos.add(equipmentStatsInfo); - } - stationStatsInfo.setEquipmentStatsInfos(EquipmentStatsInfos); - stationStatsInfos.add(stationStatsInfo); - } - res.setStationStatsInfos(stationStatsInfos); - tcecSuperviseUtil.superviseNotificationOperationStatsInfo(res); - } - @Resource - private ChargingPileClient chargingPileClient; - @Test - public void test4(){ - List<Site> data = siteClient.getSiteAll().getData(); - List<Integer> siteIds = data.stream().map(Site::getId).collect(Collectors.toList()); - List<TChargingPile> tChargingPiles = chargingPileClient.getChargingPileBySiteIds(siteIds).getData(); - List<Integer> collect1 = tChargingPiles.stream().map(TChargingPile::getId).collect(Collectors.toList()); - List<TChargingGun> chargingGunList = chargingGunClient.getChargingGunByChargingPileIds(collect1).getData(); - SupStationPowerInfoResult supStationPowerInfoResult = new SupStationPowerInfoResult(); - List<SupStationPowerInfo> stationStatsInfos = new ArrayList<>(); - LocalDateTime now = LocalDateTime.of(2025, 2, 3, 23, 30, 0); - LocalDateTime startLocalDateTime = LocalDateTime.of(2025, 2, 3, 20, 0, 0); - ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO(); - chargingStatisticeDTO.setStartTime(startLocalDateTime); - chargingStatisticeDTO.setEndTime(now); - List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData(); - for (Site datum : data) { - List<TChargingOrder> collect = data1.stream().filter(e -> e.getSiteId().equals(datum.getId()) - &&e.getChargingPower()!=null).collect(Collectors.toList()); - SupStationPowerInfo supStationPowerInfo = new SupStationPowerInfo(); - supStationPowerInfo.setOperatorID("906171535"); - supStationPowerInfo.setEquipmentOwnerID("906171535"); - supStationPowerInfo.setStationID(datum.getId().toString()); - supStationPowerInfo.setStationClassification(1); - supStationPowerInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); - if (collect.isEmpty()){ - supStationPowerInfo.setStationRealTimePower(new BigDecimal("0")); - }else{ - BigDecimal divide = collect.stream().map(TChargingOrder::getChargingPower).reduce(BigDecimal.ZERO,BigDecimal::add).divide(new BigDecimal(collect.size()),4,BigDecimal.ROUND_DOWN); - supStationPowerInfo.setStationRealTimePower(divide); - } - supStationPowerInfo.setEquipmentPowerInfos(buildEquipmentPowerInfo(datum.getId(), tChargingPiles, chargingGunList)); - - stationStatsInfos.add(supStationPowerInfo); - } - supStationPowerInfoResult.setSupStationPowerInfos(stationStatsInfos); - tcecSuperviseUtil.superviseNotificationRealtimePowerInfo(supStationPowerInfoResult); - } - /** - * 构建桩数据 - * @param tChargingPiles - * @return - */ - public List<SupEquipmentPowerInfo> buildEquipmentPowerInfo(Integer siteId, List<TChargingPile> tChargingPiles, List<TChargingGun> chargingGunList){ - List<SupEquipmentPowerInfo> equipmentInfos = new ArrayList<>(); - List<TChargingPile> collect = tChargingPiles.stream().filter(s -> s.getSiteId().equals(siteId)).collect(Collectors.toList()); - for (TChargingPile tChargingPile : collect) { - SupEquipmentPowerInfo equipmentInfo = new SupEquipmentPowerInfo(); - equipmentInfo.setEquipmentID(tChargingPile.getId().toString()); - equipmentInfo.setEquipmentClassification(1); - equipmentInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); - equipmentInfo.setEquipRealTimePower(tChargingPile.getRatedPower()); - //构建设备接口信息 - equipmentInfo.setConnectorPowerInfos(buildConnectorPowerInfos(tChargingPile.getId(), tChargingPile.getCode(), chargingGunList)); - equipmentInfos.add(equipmentInfo); - } - return equipmentInfos; - } - public List<SupConnectorPowerInfo> buildConnectorPowerInfos(Integer chargingPileId, String code, List<TChargingGun> chargingGunList){ - List<SupConnectorPowerInfo> connectorInfos = new ArrayList<>(); - List<TChargingGun> collect = chargingGunList.stream().filter(s -> s.getChargingPileId().equals(chargingPileId)).collect(Collectors.toList()); - for (TChargingGun chargingGun : collect) { - SupConnectorPowerInfo connectorInfo = new SupConnectorPowerInfo(); - connectorInfo.setConnectorID(chargingGun.getFullNumber()); - connectorInfo.setEquipmentClassification(chargingGun.getEquipmentClassification()); - connectorInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); - connectorInfo.setConnectorRealTimePower(chargingGun.getChargingPower()); - connectorInfos.add(connectorInfo); - } - return connectorInfos; - } - - private final static String query_token = "/query_token"; - private static final String OperatorID = "MA01H3BQ2"; - private static final String OperatorSecret = "f1331ef0b37c2d1b"; - private static final String SigSecret = "a6fedf0e1b27d6f7"; - private static final String DataSecret = "50a61b93919c9604"; - private static final String DataSecretIV = "7c8ac6861661d584"; - private final static String url = "https://dev-gov-hlht-sc.unievbj.com/evcs/v1.0.0"; - @Test - public void test5(){ - HttpRequest post = HttpUtil.createPost(url + query_token); - JSONObject info = new JSONObject(); - info.put("OperatorID", "906171535"); - info.put("OperatorSecret", OperatorSecret); - Long timeStamp = Long.valueOf(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); - post.contentType("application/json;charset=utf-8"); - BaseRequestJianGuan baseRequest = new BaseRequestJianGuan(); - baseRequest.setOperatorID("906171535"); - baseRequest.setTimeStamp(timeStamp+""); - baseRequest.setSeq("0001"); - String jsonString = JacksonUtils.toJson(info); - SequenceGenerator generator = new SequenceGenerator(); - String nextSequence = generator.getNextSequence(); - String data = AesEncryption.encrypt(DataSecret, DataSecretIV,jsonString); - String hmacMD5 = HMacMD5Util.getHMacMD5("906171535",timeStamp+"", data,nextSequence,SigSecret); - baseRequest.setData(data); - baseRequest.setSig(hmacMD5); - String request_json = JacksonUtils.toJson(baseRequest); - log.info("获取三方平台授权token请求地址:" + post.getUrl()); - log.info("获取三方平台授权token请求参数:" + request_json); - log.info("获取三方平台授权token请求Data:" + jsonString); - post.body(request_json); - HttpResponse execute = post.execute(); - if(200 != execute.getStatus()){ - log.error("获取三方平台授权token失败:" + execute.body()); - } - log.info("获取三方平台授权token响应参数:" + execute.body()); - BaseResult baseResult = com.alibaba.fastjson.JSON.parseObject(execute.body(), BaseResult.class); - Integer Ret = baseResult.getRet(); - if(0 != Ret){ - log.error("获取三方平台授权token失败:" + baseResult.getMsg()); - } - //解密参数 - String decrypt = AESUtil.decrypt(baseResult.getData(), DataSecret, DataSecretIV); - log.info("获取三方平台授权token响应Data:" + decrypt); - QueryTokenResult queryTokenResult = JSON.parseObject(decrypt, QueryTokenResult.class); - String token = queryTokenResult.getAccessToken(); -// Long tokenAvailableTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) + queryTokenResult.getTokenAvailab - System.err.println(token); - } } diff --git a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java index c6549e6..764dd6d 100644 --- a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java +++ b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java @@ -2332,6 +2332,15 @@ return chargingOrderService.getNotPaymentChargingOrder(); } - + + /** + * 手动推送订单给三方平台 + * @param code + * @return + */ + @PostMapping("/pushOrderInfo") + public R pushOrderInfo(@RequestParam String code){ + return chargingOrderService.pushOrderInfo(code); + } } diff --git a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java index 509ce4b..a1652ce 100644 --- a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java +++ b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java @@ -310,4 +310,13 @@ * @return */ R getNotPaymentChargingOrder(); + + + /** + * + * 手动推送订单给第三方平台 + * @param code + * @return + */ + R pushOrderInfo(String code); } diff --git a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java index 5a98a2a..07f670a 100644 --- a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java +++ b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java @@ -697,15 +697,15 @@ log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString()); sendMessageClient.platformStartCharging(platformStartCharging); - //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 - Long id = chargingOrder.getId(); - //执行5分钟的定时任务检测 - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(()->{ - if(timingDetection(id)){ - scheduler.shutdown(); - } - }, 5, 1, TimeUnit.SECONDS); +// //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 +// Long id = chargingOrder.getId(); +// //执行5分钟的定时任务检测 +// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); +// scheduler.scheduleAtFixedRate(()->{ +// if(timingDetection(id)){ +// scheduler.shutdown(); +// } +// }, 5, 1, TimeUnit.SECONDS); return AjaxResult.success(); } @@ -3890,15 +3890,15 @@ platformStartCharging.setAccount_balance(rechargeAmount); log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString()); sendMessageClient.platformStartCharging(platformStartCharging); - //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 - Long id = chargingOrder.getId(); - //执行5分钟的定时任务检测 - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(()->{ - if(timingDetection(id)){ - scheduler.shutdown(); - } - }, 5, 1, TimeUnit.SECONDS); +// //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 +// Long id = chargingOrder.getId(); +// //执行5分钟的定时任务检测 +// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); +// scheduler.scheduleAtFixedRate(()->{ +// if(timingDetection(id)){ +// scheduler.shutdown(); +// } +// }, 5, 1, TimeUnit.SECONDS); //推送三方平台订单状态 tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); @@ -3967,4 +3967,21 @@ } return R.ok(mapList); } + + + /** + * + * 手动推送订单给第三方平台 + * @param code + * @return + */ + @Override + public R pushOrderInfo(String code) { + TChargingOrder chargingOrder = this.getOne(new LambdaQueryWrapper<TChargingOrder>().eq(TChargingOrder::getCode, code)); + tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); + tcecClient.notificationStopChargeResult(chargingOrder.getStartChargeSeq(), chargingOrder.getChargingGunId().toString(), + chargingOrder.getOperatorId()); + tcecClient.notificationChargeOrderInfo(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); + return R.ok(); + } } -- Gitblit v1.7.1