ruoyi-service/ruoyi-integration/pom.xml
@@ -116,30 +116,36 @@ <scope>test</scope> </dependency> <!--rocketmq--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.2.2.RELEASE</version> <exclusions> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> </exclusion> </exclusions> </dependency> <!-- <dependency>--> <!-- <groupId>com.alibaba.cloud</groupId>--> <!-- <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>--> <!-- <version>2.2.2.RELEASE</version>--> <!-- <exclusions>--> <!-- <exclusion>--> <!-- <groupId>org.apache.rocketmq</groupId>--> <!-- <artifactId>rocketmq-client</artifactId>--> <!-- </exclusion>--> <!-- <exclusion>--> <!-- <groupId>org.apache.rocketmq</groupId>--> <!-- <artifactId>rocketmq-acl</artifactId>--> <!-- </exclusion>--> <!-- </exclusions>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.apache.rocketmq</groupId>--> <!-- <artifactId>rocketmq-client</artifactId>--> <!-- <version>4.7.1</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.apache.rocketmq</groupId>--> <!-- <artifactId>rocketmq-acl</artifactId>--> <!-- <version>4.7.1</version>--> <!-- </dependency>--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.7.1</version> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency> <!--mongodb--> ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java
@@ -6,10 +6,6 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -24,7 +20,6 @@ @SpringBootApplication @EnableScheduling//开启定时任务 @EnableTransactionManagement//开启事务 @EnableBinding({ Source.class, Sink.class }) public class RuoYiIntegrationApplication { public static void main(String[] args) { SpringApplication.run(RuoYiIntegrationApplication.class, args); ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/drainage/TCECController.java
@@ -300,7 +300,7 @@ public BaseResult queryStationsInfo(@RequestBody BaseRequest baseRequest, HttpServletRequest request){ log.info("三方平台查询充电站信息请求参数:" + JacksonUtils.toJson(baseRequest)); //校验token和签名 BaseResult baseResult = requestCheckJianGuan(true, baseRequest, request); BaseResult baseResult = requestCheck(true, baseRequest, request); if(0 != baseResult.getRet()){ log.info("三方平台查询充电站信息响应Data:"); baseResult.setData(""); @@ -399,8 +399,8 @@ stationInfo.setCountryCode(StringUtils.isNotEmpty(datum.getCountryCode()) ? datum.getCountryCode() : "CN"); stationInfo.setAreaCode(datum.getDistrictsCode()); stationInfo.setAddress(datum.getAddress()); stationInfo.setStationTel(datum.getPhone()); stationInfo.setServiceTel(serviceTel); stationInfo.setStationTel(StringUtils.isNotEmpty(datum.getPhone()) ? datum.getPhone().split(",")[0] : ""); stationInfo.setServiceTel(StringUtils.isNotEmpty(serviceTel) ? serviceTel.split(",")[0] : ""); switch (datum.getSiteType()){ case 0: stationInfo.setStationType(StationTypeEnum.OTHER.getType()); @@ -588,7 +588,7 @@ * @return */ @PostMapping("/pushChargingGunStatus") public R pushChargingGunStatus(@RequestParam("fullNumber") String fullNumber, @RequestParam("status") Integer status){ public R pushChargingGunStatus(@RequestParam(value = "fullNumber") String fullNumber, @RequestParam(value = "status") Integer status){ ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo(); connectorStatusInfo.setConnectorID(fullNumber); switch (status){ @@ -1282,7 +1282,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); chargeDetail.setDetailPower(datum.getChargingCapacity()); chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); @@ -1489,6 +1489,7 @@ notificationChargeOrderInfo.setStartChargeSeq(startChargeSeq); TChargingOrder chargingOrder = chargingOrderClient.getChargingOrderByStartChargeSeq(startChargeSeq).getData(); if(null == chargingOrder){ log.info("三方平台流水号获取订单失败"); return null; } TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); @@ -1556,7 +1557,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); chargeDetail.setDetailPower(datum.getChargingCapacity()); chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_UP)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); @@ -1751,7 +1752,7 @@ } chargeDetail.setElecPrice(datum.getElectrovalence()); chargeDetail.setSevicePrice(datum.getServiceCharge()); chargeDetail.setDetailPower(datum.getChargingCapacity()); chargeDetail.setDetailPower(datum.getChargingCapacity().setScale(2, BigDecimal.ROUND_HALF_EVEN)); chargeDetail.setDetailElecMoney(datum.getPeriodElectricPrice()); chargeDetail.setDetailSeviceMoney(datum.getPeriodServicePrice()); chargeDetails.add(chargeDetail); ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -15,7 +15,6 @@ import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.iotda.utils.tools.StrategyUtil; import com.ruoyi.integration.rocket.model.*; import com.ruoyi.integration.rocket.produce.ChargingMessageUtil; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -49,9 +48,6 @@ @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Resource private ChargingMessageUtil chargingMessageUtil; @@ -84,8 +80,9 @@ switch (service_id){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); chargingMessage.setOnlineMessage(onlineMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.onlineMessage(onlineMessage); // chargingMessage.setOnlineMessage(onlineMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 // 业务处理 登录认证应答 OnlineReply onlineReply = new OnlineReply(); @@ -102,8 +99,9 @@ break; case SendTagConstant.PING: PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); chargingMessage.setPingMessage(pingMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.pingMessage(pingMessage); // chargingMessage.setPingMessage(pingMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); @@ -114,22 +112,25 @@ break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); chargingMessage.setEndChargeMessage(endChargeMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.endChargeMessage(endChargeMessage); // chargingMessage.setEndChargeMessage(endChargeMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); chargingMessage.setErrorMessageMessage(errorMessageMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage); // chargingMessage.setErrorMessageMessage(errorMessageMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage); // chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ @@ -157,8 +158,9 @@ break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage); // chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); @@ -177,55 +179,63 @@ case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: log.info("充电实时数据上传"); UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); // chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage); // chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); chargingMessage.setParameterSettingMessage(parameterSettingMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage); // chargingMessage.setParameterSettingMessage(parameterSettingMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); chargingMessage.setBmsAbortMessage(bmsAbortMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage); // chargingMessage.setBmsAbortMessage(bmsAbortMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); chargingMessage.setMotorAbortMessage(motorAbortMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage); // chargingMessage.setMotorAbortMessage(motorAbortMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); // chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); chargingMessage.setBmsInformationMessage(bmsInformationMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage); // chargingMessage.setBmsInformationMessage(bmsInformationMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage); // chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); @@ -239,23 +249,26 @@ break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage); // chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage); // chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); transactionRecordMessage.setResult(content.toJSONString()); chargingMessage.setTransactionRecordMessage(transactionRecordMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage); // chargingMessage.setTransactionRecordMessage(transactionRecordMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); @@ -265,36 +278,41 @@ break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage); // chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); // chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage); // chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage); // chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); chargingMessage.setTimingSettingMessage(timingSettingMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage); // chargingMessage.setTimingSettingMessage(timingSettingMessage); // chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReply = new TimingSettingReply(); timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); @@ -304,55 +322,62 @@ break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage); // chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage); // chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); // chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage); // chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); // chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage); // chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); // chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); // chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; } // log.info("rocketmq消息下发结果:{}",sendResult); log.info("rocketmq消息下发结果:{}",sendResult); return AjaxResult.success(); } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java
@@ -30,76 +30,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_acquisition_billing_mode", topic = "charge_acquisition_billing_mode", selectorExpression = "acquisition_billing_mode", // 明确指定标签 consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "acquisition_billing_mode" ) public class AcquisitionBillingModeMessageListener extends EnhanceMessageHandler<AcquisitionBillingModeMessage> implements RocketMQListener<AcquisitionBillingModeMessage> { public class AcquisitionBillingModeMessageListener implements RocketMQListener<AcquisitionBillingModeMessage> { @Autowired private AcquisitionBillingModeService acquisitionBillingModeService; @Autowired private AccountingStrategyDetailClient accountingStrategyDetailClient; @Autowired private IotMessageProduce iotMessageProduce; @Autowired private MessageUtil messageUtil; @Override protected void handleMessage(AcquisitionBillingModeMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩计费模型请求-业务消息处理:{}",message); // 持久化消息 AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); BeanUtils.copyProperties(message,acquisitionBillingMode); acquisitionBillingModeService.create(acquisitionBillingMode); // 业务处理 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 // List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(message.getCharging_pile_code()).getData(); // Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); // // 价格设置 // AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); // StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); // // 时段设置 // StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); // iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); } @Override protected void handleMaxRetriesExceeded(AcquisitionBillingModeMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(AcquisitionBillingModeMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(AcquisitionBillingModeMessage message) { super.dispatchMessage(message); log.info("充电桩计费模型请求-业务消息处理:{}",message); // 持久化消息 AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); BeanUtils.copyProperties(message,acquisitionBillingMode); acquisitionBillingModeService.create(acquisitionBillingMode); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java
@@ -27,63 +27,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_billing_mode_verify", topic = "charge_billing_mode_verify", selectorExpression = "billing_mode_verify", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "billing_mode_verify" ) public class BillingModeVerifyMessageListener extends EnhanceMessageHandler<BillingModeVerifyMessage> implements RocketMQListener<BillingModeVerifyMessage> { public class BillingModeVerifyMessageListener implements RocketMQListener<BillingModeVerifyMessage> { @Autowired private BillingModeVerifyService billingModeVerifyService; @Override protected void handleMessage(BillingModeVerifyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("计费模型验证请求-业务消息处理:{}",message); // 持久化消息 BillingModeVerify billingModeVerify = new BillingModeVerify(); BeanUtils.copyProperties(message,billingModeVerify); billingModeVerifyService.create(billingModeVerify); // 业务处理 } @Override protected void handleMaxRetriesExceeded(BillingModeVerifyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(BillingModeVerifyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BillingModeVerifyMessage message) { super.dispatchMessage(message); log.info("计费模型验证请求-业务消息处理:{}",message); // 持久化消息 BillingModeVerify billingModeVerify = new BillingModeVerify(); BeanUtils.copyProperties(message,billingModeVerify); billingModeVerifyService.create(billingModeVerify); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java
@@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @Component @@ -22,22 +25,21 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_abort", topic = "charge_bms_abort", selectorExpression = "bms_abort", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "bms_abort" ) public class BmsAbortMessageListener extends EnhanceMessageHandler<BmsAbortMessage> implements RocketMQListener<BmsAbortMessage> { public class BmsAbortMessageListener implements RocketMQListener<BmsAbortMessage> { @Autowired private BmsAbortService bmsAbortService; @Resource private ChargingOrderClient chargingOrderClient; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override protected void handleMessage(BmsAbortMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 public void onMessage(BmsAbortMessage message) { log.info("充电阶段BMS中止-业务消息处理:{}",message); // 持久化消息 BmsAbort bmsAbort = new BmsAbort(); @@ -45,46 +47,5 @@ bmsAbortService.create(bmsAbort); // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); } @Override protected void handleMaxRetriesExceeded(BmsAbortMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(BmsAbortMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BmsAbortMessage message) { super.dispatchMessage(message); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java
@@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; @Slf4j @@ -23,69 +24,32 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_demand_and_charger_exportation", topic = "charge_bms_demand_and_charger_exportation", selectorExpression = "bms_demand_and_charger_exportation", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "bms_demand_and_charger_exportation" ) public class BmsDemandAndChargerExportationMessageListener extends EnhanceMessageHandler<BmsDemandAndChargerExportationMessage> implements RocketMQListener<BmsDemandAndChargerExportationMessage> { public class BmsDemandAndChargerExportationMessageListener implements RocketMQListener<BmsDemandAndChargerExportationMessage> { @Autowired private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; @Autowired @Resource private ChargingOrderClient chargingOrderClient; @Override protected void handleMessage(BmsDemandAndChargerExportationMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message); // 持久化消息 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(message,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // 业务处理 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrder)){ chargingOrder.setNeedElec(message.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrder); } } @Override protected void handleMaxRetriesExceeded(BmsDemandAndChargerExportationMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(BmsDemandAndChargerExportationMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BmsDemandAndChargerExportationMessage message) { super.dispatchMessage(message); log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",message); // 持久化消息 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(message,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(message.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java
@@ -1,10 +1,8 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.BmsDemandAndChargerExportation; import com.ruoyi.integration.api.model.BmsInformation; import com.ruoyi.integration.mongodb.service.BmsInformationService; import com.ruoyi.integration.rocket.model.BmsInformationMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -19,62 +17,22 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_bms_information", topic = "charge_bms_information", selectorExpression = "bms_information", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "bms_information" ) public class BmsInformationMessageListener extends EnhanceMessageHandler<BmsInformationMessage> implements RocketMQListener<BmsInformationMessage> { public class BmsInformationMessageListener implements RocketMQListener<BmsInformationMessage> { @Autowired private BmsInformationService bmsInformationService; @Override protected void handleMessage(BmsInformationMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电过程BMS信息-业务消息处理:{}",message); // 持久化消息 BmsInformation bmsInformation = new BmsInformation(); BeanUtils.copyProperties(message,bmsInformation); bmsInformationService.create(bmsInformation); // 业务处理 } @Override protected void handleMaxRetriesExceeded(BmsInformationMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(BmsInformationMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(BmsInformationMessage message) { super.dispatchMessage(message); log.info("充电过程BMS信息-业务消息处理:{}",message); // 持久化消息 BmsInformation bmsInformation = new BmsInformation(); BeanUtils.copyProperties(message,bmsInformation); bmsInformationService.create(bmsInformation); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java
@@ -19,62 +19,23 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_handshake", topic = "charge_charging_handshake", selectorExpression = "charging_handshake", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "charging_handshake" ) public class ChargingHandshakeMessageListener extends EnhanceMessageHandler<ChargingHandshakeMessage> implements RocketMQListener<ChargingHandshakeMessage> { public class ChargingHandshakeMessageListener implements RocketMQListener<ChargingHandshakeMessage> { @Autowired private ChargingHandshakeService chargingHandshakeService; @Override protected void handleMessage(ChargingHandshakeMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电握手-业务消息处理:{}",message); // 持久化消息 ChargingHandshake chargingHandshake = new ChargingHandshake(); BeanUtils.copyProperties(message,chargingHandshake); chargingHandshakeService.create(chargingHandshake); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ChargingHandshakeMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ChargingHandshakeMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingHandshakeMessage message) { super.dispatchMessage(message); log.info("充电握手-业务消息处理:{}",message); // 持久化消息 ChargingHandshake chargingHandshake = new ChargingHandshake(); BeanUtils.copyProperties(message,chargingHandshake); chargingHandshakeService.create(chargingHandshake); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_pile_returns_ground_lock_data", topic = "charge_charging_pile_returns_ground_lock_data", selectorExpression = "charging_pile_returns_ground_lock_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "charging_pile_returns_ground_lock_data" ) public class ChargingPileReturnsGroundLockDataMessageListener extends EnhanceMessageHandler<ChargingPileReturnsGroundLockDataMessage> implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> { public class ChargingPileReturnsGroundLockDataMessageListener implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> { @Autowired private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; @Override protected void handleMessage(ChargingPileReturnsGroundLockDataMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩返回数据(上行)-业务消息处理:{}",message); // 持久化消息 ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData); chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ChargingPileReturnsGroundLockDataMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ChargingPileReturnsGroundLockDataMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingPileReturnsGroundLockDataMessage message) { super.dispatchMessage(message); log.info("充电桩返回数据(上行)-业务消息处理:{}",message); // 持久化消息 ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); BeanUtils.copyProperties(message,chargingPileReturnsGroundLockData); chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_pile_starts_charging", topic = "charge_charging_pile_starts_charging", selectorExpression = "charging_pile_starts_charging", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "charging_pile_starts_charging" ) public class ChargingPileStartsChargingMessageListener extends EnhanceMessageHandler<ChargingPileStartsChargingMessage> implements RocketMQListener<ChargingPileStartsChargingMessage> { public class ChargingPileStartsChargingMessageListener implements RocketMQListener<ChargingPileStartsChargingMessage> { @Autowired private ChargingPileStartsChargingService chargingPileStartsChargingService; @Override protected void handleMessage(ChargingPileStartsChargingMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩主动申请启动充电-业务消息处理:{}",message); // 持久化消息 ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); BeanUtils.copyProperties(message,chargingPileStartsCharging); chargingPileStartsChargingService.create(chargingPileStartsCharging); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ChargingPileStartsChargingMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ChargingPileStartsChargingMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingPileStartsChargingMessage message) { super.dispatchMessage(message); log.info("充电桩主动申请启动充电-业务消息处理:{}",message); // 持久化消息 ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); BeanUtils.copyProperties(message,chargingPileStartsCharging); chargingPileStartsChargingService.create(chargingPileStartsCharging); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_clear_offline_card_reply", topic = "charge_clear_offline_card_reply", selectorExpression = "clear_offline_card_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "clear_offline_card_reply" ) public class ClearOfflineCardReplyMessageListener extends EnhanceMessageHandler<ClearOfflineCardReplyMessage> implements RocketMQListener<ClearOfflineCardReplyMessage> { public class ClearOfflineCardReplyMessageListener implements RocketMQListener<ClearOfflineCardReplyMessage> { @Autowired private ClearOfflineCardReplyService clearOfflineCardReplyService; @Override protected void handleMessage(ClearOfflineCardReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("离线卡数据清除应答-业务消息处理:{}",message); // 持久化消息 ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); BeanUtils.copyProperties(message,clearOfflineCardReply); clearOfflineCardReplyService.create(clearOfflineCardReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ClearOfflineCardReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ClearOfflineCardReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ClearOfflineCardReplyMessage message) { super.dispatchMessage(message); log.info("离线卡数据清除应答-业务消息处理:{}",message); // 持久化消息 ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); BeanUtils.copyProperties(message,clearOfflineCardReply); clearOfflineCardReplyService.create(clearOfflineCardReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java
@@ -3,13 +3,17 @@ import com.ruoyi.integration.api.model.EndCharge; import com.ruoyi.integration.api.model.Ping; import com.ruoyi.integration.api.model.Pong; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce; import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.mongodb.service.EndChargeService; import com.ruoyi.integration.mongodb.service.PingService; import com.ruoyi.integration.rocket.model.ChargingMessage; import com.ruoyi.integration.rocket.model.ChargingOrderMessage; import com.ruoyi.integration.rocket.model.EndChargeMessage; import com.ruoyi.integration.rocket.model.PingMessage; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import lombok.extern.slf4j.Slf4j; @@ -28,21 +32,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_end_charge", topic = "charge_end_charge", selectorExpression = "end_charge", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "end_charge" ) public class EndChargeMessageListener extends EnhanceMessageHandler<EndChargeMessage> implements RocketMQListener<EndChargeMessage> { public class EndChargeMessageListener implements RocketMQListener<EndChargeMessage> { @Autowired private EndChargeService endChargeService; @Resource private ChargingOrderClient chargingOrderClient; @Autowired private EnhanceProduce enhanceProduce; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override protected void handleMessage(EndChargeMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 public void onMessage(EndChargeMessage message) { log.info("充电结束-业务消息处理:{}",message); // 持久化消息 EndCharge endCharge = new EndCharge(); @@ -50,46 +57,19 @@ endChargeService.create(endCharge); // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); } @Override protected void handleMaxRetriesExceeded(EndChargeMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(EndChargeMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(EndChargeMessage message) { super.dispatchMessage(message); // 订单id String transactionSerialNumber = endCharge.getTransaction_serial_number(); ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); chargingOrderMessage.setOrderNumber(transactionSerialNumber); // 推送充电订单信息 ChargingMessage chargingMessage1 = new ChargingMessage(); chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); chargingMessage1.setOrderMessage(chargingOrderMessage); enhanceProduce.orderInfoMessage(chargingMessage1); // 推送充电订单状态 ChargingMessage chargingMessage2 = new ChargingMessage(); chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage2.setOrderMessage(chargingOrderMessage); enhanceProduce.orderStatusMessage(chargingMessage2); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ErrorMessageMessageListener.java
@@ -17,64 +17,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_error_message", topic = "charge_error_message", selectorExpression = "error_message", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "error_message" ) public class ErrorMessageMessageListener extends EnhanceMessageHandler<ErrorMessageMessage> implements RocketMQListener<ErrorMessageMessage> { public class ErrorMessageMessageListener implements RocketMQListener<ErrorMessageMessage> { @Autowired private ErrorMessageMessageService errorMessageMessageService; @Override protected void handleMessage(ErrorMessageMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("错误报文-业务消息处理:{}",message); // 持久化消息 ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); BeanUtils.copyProperties(message,errorMessageMessage); errorMessageMessageService.create(errorMessageMessage); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ErrorMessageMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ErrorMessageMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ErrorMessageMessage message) { super.dispatchMessage(message); log.info("错误报文-业务消息处理:{}",message); // 持久化消息 ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); BeanUtils.copyProperties(message,errorMessageMessage); errorMessageMessageService.create(errorMessageMessage); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_ground_lock_real_time_data", topic = "charge_ground_lock_real_time_data", selectorExpression = "ground_lock_real_time_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "ground_lock_real_time_data" ) public class GroundLockRealTimeDataMessageListener extends EnhanceMessageHandler<GroundLockRealTimeDataMessage> implements RocketMQListener<GroundLockRealTimeDataMessage> { public class GroundLockRealTimeDataMessageListener implements RocketMQListener<GroundLockRealTimeDataMessage> { @Autowired private GroundLockRealTimeDataService groundLockRealTimeDataService; @Override protected void handleMessage(GroundLockRealTimeDataMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message); // 持久化消息 GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); BeanUtils.copyProperties(message,groundLockRealTimeData); groundLockRealTimeDataService.create(groundLockRealTimeData); // 业务处理 } @Override protected void handleMaxRetriesExceeded(GroundLockRealTimeDataMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(GroundLockRealTimeDataMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(GroundLockRealTimeDataMessage message) { super.dispatchMessage(message); log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",message); // 持久化消息 GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); BeanUtils.copyProperties(message,groundLockRealTimeData); groundLockRealTimeDataService.create(groundLockRealTimeData); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java
@@ -22,10 +22,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_motor_abort", topic = "charge_motor_abort", selectorExpression = "motor_abort", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "motor_abort" ) public class MotorAbortMessageListener extends EnhanceMessageHandler<MotorAbortMessage> implements RocketMQListener<MotorAbortMessage> { public class MotorAbortMessageListener implements RocketMQListener<MotorAbortMessage> { @Autowired private MotorAbortService motorAbortService; @@ -34,11 +33,11 @@ private ChargingOrderClient chargingOrderClient; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override protected void handleMessage(MotorAbortMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 public void onMessage(MotorAbortMessage message) { log.info("充电阶段充电机中止-业务消息处理:{}",message); // 持久化消息 MotorAbort motorAbort = new MotorAbort(); @@ -46,46 +45,5 @@ motorAbortService.create(motorAbort); // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); } @Override protected void handleMaxRetriesExceeded(MotorAbortMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(MotorAbortMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(MotorAbortMessage message) { super.dispatchMessage(message); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java
@@ -1,5 +1,6 @@ package com.ruoyi.integration.rocket.listener; import com.alibaba.fastjson.JSON; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.mongodb.service.OnlineService; import com.ruoyi.integration.rocket.model.OnlineMessage; @@ -16,63 +17,22 @@ @RocketMQMessageListener( consumerGroup = "charge_online", topic = "charge_online", selectorExpression = "online", // 明确指定标签 consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "online" ) public class OnlineMessageListener extends EnhanceMessageHandler<OnlineMessage> implements RocketMQListener<OnlineMessage> { public class OnlineMessageListener implements RocketMQListener<OnlineMessage> { @Autowired private OnlineService onlineService; @Override protected void handleMessage(OnlineMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩登录认证业务消息处理:{}",message); // 持久化消息 Online online = new Online(); BeanUtils.copyProperties(message,online); onlineService.create(online); // 业务处理 } @Override protected void handleMaxRetriesExceeded(OnlineMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(OnlineMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(OnlineMessage message) { super.dispatchMessage(message); log.info("充电桩登录认证业务消息处理:{}", JSON.toJSONString(message)); // 持久化消息 Online online = new Online(); BeanUtils.copyProperties(message,online); onlineService.create(online); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java
@@ -17,65 +17,26 @@ @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 consumerGroup = "charge_parameter_setting", topic = "charge_parameter_setting", selectorExpression = "parameter_setting" ) public class ParameterSettingMessageListener extends EnhanceMessageHandler<ParameterSettingMessage> implements RocketMQListener<ParameterSettingMessage> { public class ParameterSettingMessageListener implements RocketMQListener<ParameterSettingMessage> { @Autowired private ParameterSettingService parameterSettingService; @Override protected void handleMessage(ParameterSettingMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("业务消息处理:{}",message); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(message,parameterSetting); parameterSettingService.create(parameterSetting); // 业务处理 } @Override protected void handleMaxRetriesExceeded(ParameterSettingMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ParameterSettingMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ParameterSettingMessage message) { super.dispatchMessage(message); log.info("参数配置-业务消息处理:{}",message); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(message,parameterSetting); parameterSettingService.create(parameterSetting); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java
@@ -1,18 +1,30 @@ package com.ruoyi.integration.rocket.listener; import com.alibaba.fastjson2.JSON; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.Ping; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.PingService; import com.ruoyi.integration.rocket.model.ChargingMessage; import com.ruoyi.integration.rocket.model.GunStatusMessage; import com.ruoyi.integration.rocket.model.OnlineMessage; import com.ruoyi.integration.rocket.model.PingMessage; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @Component @@ -20,62 +32,47 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_ping", topic = "charge_ping", selectorExpression = "ping", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "ping" ) public class PingMessageListener extends EnhanceMessageHandler<PingMessage> implements RocketMQListener<PingMessage> { public class PingMessageListener implements RocketMQListener<PingMessage> { @Autowired private PingService pingService; @Override protected void handleMessage(PingMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩心跳包-业务消息处理:{}",message); // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(message,ping); pingService.create(ping); // 业务处理 } @Override protected void handleMaxRetriesExceeded(PingMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(PingMessage message) { // 此处可做消息过滤 return false; } @Resource private RedisTemplate redisTemplate; @Autowired private EnhanceProduce enhanceProduce; @Resource private ChargingPileClient chargingPileClient; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PingMessage message) { super.dispatchMessage(message); log.info("充电桩心跳包-业务消息处理:{}", JSON.toJSONString(message)); // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(message,ping); pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(message.getCharging_gun_code()); vo1.setPile_code(message.getCharging_pile_code()); vo1.setStatus(message.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); // 监管平台推送充电设备状态 SendResult sendResult; String gunCode = message.getCharging_pile_code() + message.getCharging_gun_code(); ChargingMessage chargingMessage = new ChargingMessage(); chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); GunStatusMessage gunStatusMessage = new GunStatusMessage(); gunStatusMessage.setFullNumber(gunCode); chargingMessage.setGunStatusMessage(gunStatusMessage); sendResult = enhanceProduce.gunStatusMessage(chargingMessage); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java
@@ -1,6 +1,7 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.ParameterSetting; import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply; import com.ruoyi.integration.mongodb.service.PlatformRemoteUpdateReplyService; import com.ruoyi.integration.rocket.model.PlatformRemoteUpdateReplyMessage; @@ -19,63 +20,23 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_remote_update_reply", topic = "charge_platform_remote_update_reply", selectorExpression = "platform_remote_update_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "platform_remote_update_reply" ) public class PlatformRemoteUpdateReplyMessageListener extends EnhanceMessageHandler<PlatformRemoteUpdateReplyMessage> implements RocketMQListener<PlatformRemoteUpdateReplyMessage> { public class PlatformRemoteUpdateReplyMessageListener implements RocketMQListener<PlatformRemoteUpdateReplyMessage> { @Autowired private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; @Override protected void handleMessage(PlatformRemoteUpdateReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("远程更新应答-业务消息处理:{}",message); // 持久化消息 PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(message,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(PlatformRemoteUpdateReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(PlatformRemoteUpdateReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformRemoteUpdateReplyMessage message) { super.dispatchMessage(message); log.info("业务消息处理:{}",message); // 持久化消息 PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(message,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java
@@ -19,63 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_restart_reply", topic = "charge_platform_restart_reply", selectorExpression = "platform_restart_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "platform_restart_reply" ) public class PlatformRestartReplyMessageListener extends EnhanceMessageHandler<PlatformRestartReplyMessage> implements RocketMQListener<PlatformRestartReplyMessage> { public class PlatformRestartReplyMessageListener implements RocketMQListener<PlatformRestartReplyMessage> { @Autowired private PlatformRestartReplyService platformRestartReplyService; @Override protected void handleMessage(PlatformRestartReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("远程重启应答-业务消息处理:{}",message); // 持久化消息 PlatformRestartReply platformRestartReply = new PlatformRestartReply(); BeanUtils.copyProperties(message,platformRestartReply); platformRestartReplyService.create(platformRestartReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(PlatformRestartReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(PlatformRestartReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformRestartReplyMessage message) { super.dispatchMessage(message); log.info("远程重启应答-业务消息处理:{}",message); // 持久化消息 PlatformRestartReply platformRestartReply = new PlatformRestartReply(); BeanUtils.copyProperties(message,platformRestartReply); platformRestartReplyService.create(platformRestartReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java
@@ -23,10 +23,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_start_charging_reply", topic = "charge_platform_start_charging_reply", selectorExpression = "platform_start_charging_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "platform_start_charging_reply" ) public class PlatformStartChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStartChargingReplyMessage> implements RocketMQListener<PlatformStartChargingReplyMessage> { public class PlatformStartChargingReplyMessageListener implements RocketMQListener<PlatformStartChargingReplyMessage> { @Autowired private PlatformStartChargingReplyService platformStartChargingReplyService; @@ -36,58 +35,20 @@ @Override protected void handleMessage(PlatformStartChargingReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("远程启机命令回复-业务消息处理:{}",message); // 持久化消息 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(message,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); // 业务处理 PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(message, message1); chargingOrderClient.startChargeSuccessfully(message1); } @Override protected void handleMaxRetriesExceeded(PlatformStartChargingReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(PlatformStartChargingReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformStartChargingReplyMessage message) { super.dispatchMessage(message); log.info("远程启机命令回复-业务消息处理:{}",message); // 持久化消息 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(message,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); // 业务处理 PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(message, message1); chargingOrderClient.startChargeSuccessfully(message1); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java
@@ -5,6 +5,8 @@ import com.ruoyi.integration.mongodb.service.PlatformStopChargingReplyService; import com.ruoyi.integration.rocket.model.PlatformStopChargingReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -13,68 +15,37 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_platform_stop_charging_reply", topic = "charge_platform_stop_charging_reply", selectorExpression = "platform_stop_charging_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "platform_stop_charging_reply" ) public class PlatformStopChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStopChargingReplyMessage> implements RocketMQListener<PlatformStopChargingReplyMessage> { public class PlatformStopChargingReplyMessageListener implements RocketMQListener<PlatformStopChargingReplyMessage> { @Autowired private PlatformStopChargingReplyService platformStopChargingReplyService; @Override protected void handleMessage(PlatformStopChargingReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("远程停机命令回复-业务消息处理:{}",message); // 持久化消息 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(message,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(PlatformStopChargingReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(PlatformStopChargingReplyMessage message) { // 此处可做消息过滤 return false; } @Resource private ChargingOrderClient chargingOrderClient; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(PlatformStopChargingReplyMessage message) { super.dispatchMessage(message); log.info("远程停机命令回复-业务消息处理:{}",message); // 持久化消息 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(message,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QrCodeDeliveryReplyMessageListener.java
New file @@ -0,0 +1,40 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.QrCodeDeliveryReply; import com.ruoyi.integration.mongodb.service.QrCodeDeliveryReplyService; import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage; import com.ruoyi.integration.rocket.model.QrCodeDeliveryReplyMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author zhibing.pu * @Date 2025/4/28 14:57 */ @Slf4j @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_qr_code_delivery_reply", topic = "charge_qr_code_delivery_reply", selectorExpression = "qr_code_delivery_reply" ) public class QrCodeDeliveryReplyMessageListener implements RocketMQListener<QrCodeDeliveryReplyMessage> { @Autowired private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; @Override public void onMessage(QrCodeDeliveryReplyMessage message) { log.info("二维码下发应答-业务消息处理:{}",message); QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); BeanUtils.copyProperties(message,qrCodeDeliveryReply); qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java
@@ -1,10 +1,13 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.PlatformRemoteUpdateReply; import com.ruoyi.integration.api.model.PlatformStopChargingReply; import com.ruoyi.integration.api.model.QueryOfflineCardReply; import com.ruoyi.integration.mongodb.service.QueryOfflineCardReplyService; import com.ruoyi.integration.rocket.model.QueryOfflineCardReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -17,64 +20,27 @@ @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 consumerGroup = "charge_query_offline_card_reply", topic = "charge_query_offline_card_reply", selectorExpression = "query_offline_card_reply" ) public class QueryOfflineCardReplyMessageListener extends EnhanceMessageHandler<QueryOfflineCardReplyMessage> implements RocketMQListener<QueryOfflineCardReplyMessage> { public class QueryOfflineCardReplyMessageListener implements RocketMQListener<QueryOfflineCardReplyMessage> { @Autowired private QueryOfflineCardReplyService queryOfflineCardReplyService; @Override protected void handleMessage(QueryOfflineCardReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("离线卡数据查询应答-业务消息处理:{}",message); // 持久化消息 QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply(); BeanUtils.copyProperties(message,queryOfflineCardReply); queryOfflineCardReplyService.create(queryOfflineCardReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(QueryOfflineCardReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(QueryOfflineCardReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(QueryOfflineCardReplyMessage message) { super.dispatchMessage(message); log.info("业务消息处理:{}",message); // 持久化消息 QueryOfflineCardReply queryOfflineCardReply = new QueryOfflineCardReply(); BeanUtils.copyProperties(message,queryOfflineCardReply); queryOfflineCardReplyService.create(queryOfflineCardReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SecurityDetectionMessageListener.java
New file @@ -0,0 +1,50 @@ package com.ruoyi.integration.rocket.listener; import com.ruoyi.integration.api.model.SecurityDetection; import com.ruoyi.integration.mongodb.service.SecurityDetectionService; import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage; import com.ruoyi.integration.rocket.model.SecurityDetectionMessage; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.vo.SecurityDetectionVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author zhibing.pu * @Date 2025/4/28 14:59 */ @Slf4j @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_security_detection", topic = "charge_security_detection", selectorExpression = "security_detection" ) public class SecurityDetectionMessageListener implements RocketMQListener<SecurityDetectionMessage> { @Resource private ChargingOrderClient chargingOrderClient; @Autowired private SecurityDetectionService securityDetectionService; @Override public void onMessage(SecurityDetectionMessage message) { log.info("安全监测-业务消息处理:{}",message); SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(message,securityDetection); securityDetectionService.create(securityDetection); SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java
@@ -19,63 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_setup_billing_model_reply", topic = "charge_setup_billing_model_reply", selectorExpression = "setup_billing_model_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "setup_billing_model_reply" ) public class SetupBillingModelReplyMessageListener extends EnhanceMessageHandler<SetupBillingModelReplyMessage> implements RocketMQListener<SetupBillingModelReplyMessage> { public class SetupBillingModelReplyMessageListener implements RocketMQListener<SetupBillingModelReplyMessage> { @Autowired private SetupBillingModelReplyService setupBillingModelReplyService; @Override protected void handleMessage(SetupBillingModelReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("计费模型应答-业务消息处理:{}",message); // 持久化消息 SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); BeanUtils.copyProperties(message,setupBillingModelReply); setupBillingModelReplyService.create(setupBillingModelReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(SetupBillingModelReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(SetupBillingModelReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(SetupBillingModelReplyMessage message) { super.dispatchMessage(message); log.info("计费模型应答-业务消息处理:{}",message); // 持久化消息 SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); BeanUtils.copyProperties(message,setupBillingModelReply); setupBillingModelReplyService.create(setupBillingModelReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java
@@ -19,62 +19,25 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_synchronize_offline_card_reply", topic = "charge_synchronize_offline_card_reply", selectorExpression = "synchronize_offline_card_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "synchronize_offline_card_reply" ) public class SynchronizeOfflineCardReplyMessageListener extends EnhanceMessageHandler<SynchronizeOfflineCardReplyMessage> implements RocketMQListener<SynchronizeOfflineCardReplyMessage> { public class SynchronizeOfflineCardReplyMessageListener implements RocketMQListener<SynchronizeOfflineCardReplyMessage> { @Autowired private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; @Override protected void handleMessage(SynchronizeOfflineCardReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("卡数据同步应答-业务消息处理:{}",message); // 持久化消息 SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); BeanUtils.copyProperties(message,synchronizeOfflineCardReply); synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(SynchronizeOfflineCardReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(SynchronizeOfflineCardReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(SynchronizeOfflineCardReplyMessage message) { super.dispatchMessage(message); log.info("卡数据同步应答-业务消息处理:{}",message); // 持久化消息 SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); BeanUtils.copyProperties(message,synchronizeOfflineCardReply); synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java
@@ -26,62 +26,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_timing_setting", topic = "charge_timing_setting", selectorExpression = "timing_setting", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "timing_setting" ) public class TimingSettingMessageListener extends EnhanceMessageHandler<TimingSettingMessage> implements RocketMQListener<TimingSettingMessage> { public class TimingSettingMessageListener implements RocketMQListener<TimingSettingMessage> { @Autowired private TimingSettingService timingSettingService; @Override protected void handleMessage(TimingSettingMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("对时设置-业务消息处理:{}",message); // 持久化消息 TimingSetting timingSetting = new TimingSetting(); BeanUtils.copyProperties(message,timingSetting); timingSettingService.create(timingSetting); // 业务处理 } @Override protected void handleMaxRetriesExceeded(TimingSettingMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(TimingSettingMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(TimingSettingMessage message) { super.dispatchMessage(message); log.info("对时设置-业务消息处理:{}",message); // 持久化消息 TimingSetting timingSetting = new TimingSetting(); BeanUtils.copyProperties(message,timingSetting); timingSettingService.create(timingSetting); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java
@@ -1,24 +1,31 @@ package com.ruoyi.integration.rocket.listener; import com.alibaba.fastjson.JSONObject; import com.ruoyi.integration.api.model.ConfirmTransactionRecord; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.TransactionRecord; import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce; import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.mongodb.service.TransactionRecordService; import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService; import com.ruoyi.integration.rocket.model.TransactionRecordMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; @Slf4j @@ -27,69 +34,56 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_transaction_record", topic = "charge_transaction_record", selectorExpression = "transaction_record", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "transaction_record" ) public class TransactionRecordMessageListener extends EnhanceMessageHandler<TransactionRecordMessage> implements RocketMQListener<TransactionRecordMessage> { public class TransactionRecordMessageListener implements RocketMQListener<TransactionRecordMessage> { @Autowired private TransactionRecordService transactionRecordService; @Autowired private ChargingOrderClient chargingOrderClient; @Override protected void handleMessage(TransactionRecordMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("交易记录-业务消息处理:{}",message); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(message,transactionRecord); transactionRecordService.create(transactionRecord); // 业务处理 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrder)){ chargingOrder.setTotalElectricity(message.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrder); } } @Override protected void handleMaxRetriesExceeded(TransactionRecordMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(TransactionRecordMessage message) { // 此处可做消息过滤 return false; } @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Resource private RedisTemplate redisTemplate; /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(TransactionRecordMessage message) { super.dispatchMessage(message); log.info("交易记录-业务消息处理:{}",message); message.setResult(JSONObject.toJSONString(message)); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(message,transactionRecord); transactionRecord.setResult(message.getResult()); transactionRecordService.create(transactionRecord); // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(message.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(message,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, message.getTransaction_serial_number()); } // 添加实时上传记录结束记录 // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); if(Objects.nonNull(data) && data.getStatus() != 5){ UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); uploadRealTimeMonitoringData.setStatus(5); uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); } } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_update_balance_reply", topic = "charge_update_balance_reply", selectorExpression = "update_balance_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "update_balance_reply" ) public class UpdateBalanceReplyMessageListener extends EnhanceMessageHandler<UpdateBalanceReplyMessage> implements RocketMQListener<UpdateBalanceReplyMessage> { public class UpdateBalanceReplyMessageListener implements RocketMQListener<UpdateBalanceReplyMessage> { @Autowired private UpdateBalanceReplyService updateBalanceReplyService; @Override protected void handleMessage(UpdateBalanceReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("余额更新应答-业务消息处理:{}",message); // 持久化消息 UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); BeanUtils.copyProperties(message,updateBalanceReply); updateBalanceReplyService.create(updateBalanceReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(UpdateBalanceReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(UpdateBalanceReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(UpdateBalanceReplyMessage message) { super.dispatchMessage(message); log.info("余额更新应答-业务消息处理:{}",message); // 持久化消息 UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); BeanUtils.copyProperties(message,updateBalanceReply); updateBalanceReplyService.create(updateBalanceReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java
@@ -10,8 +10,12 @@ import com.ruoyi.integration.api.feignClient.TCECClient; import com.ruoyi.integration.api.model.Online; import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService; import com.ruoyi.integration.rocket.model.ChargingMessage; import com.ruoyi.integration.rocket.model.ChargingOrderMessage; import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; @@ -36,10 +40,9 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_upload_real_time_monitoring_data", topic = "charge_upload_real_time_monitoring_data", selectorExpression = "upload_real_time_monitoring_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "upload_real_time_monitoring_data" ) public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @@ -48,129 +51,65 @@ private ChargingOrderClient chargingOrderClient; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Resource private ChargingGunClient chargingGunClient; @Resource private FaultMessageClient faultMessageClient; @Autowired private EnhanceProduce enhanceProduce; @Resource private TCECClient tcecClient; @Override protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("上传实时监测数据-业务消息处理:{}",message); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); // 查询订单 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); // 查询当前时间段的计费策略 TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); if (Objects.nonNull(data)) { uploadRealTimeMonitoringData.setLast_time(data.getLast_time()); uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount())); uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree())); uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); }else { log.info("首次上传实时监测数据"); uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); } uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); GetChargingGunByCode code = new GetChargingGunByCode(); code.setCharging_pile_code(message.getCharging_pile_code()); code.setCharging_gun_code(message.getCharging_gun_code()); TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData(); if(Objects.nonNull(chargingGun)){ // 存储状态信息 TFaultMessage faultMessage = new TFaultMessage(); if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){ faultMessage.setSiteId(chargingGun.getSiteId()); faultMessage.setChargingPileId(chargingGun.getChargingPileId()); faultMessage.setChargingGunId(chargingGun.getId()); switch (message.getCharging_gun_status()){ case 0: faultMessage.setStatus(1); chargingGun.setStatus(1); break; case 1: faultMessage.setStatus(2); chargingGun.setStatus(7); break; } faultMessage.setDownTime(LocalDateTime.now()); faultMessageClient.createFaultMessage(faultMessage); }else { switch (message.getCharging_gun_status()){ case 2: chargingGun.setStatus(2); break; case 3: chargingGun.setStatus(4); break; } // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录 faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData(); if(Objects.nonNull(faultMessage)){ faultMessage.setEndTime(LocalDateTime.now()); faultMessageClient.updateFaultMessage(faultMessage); } } chargingGunClient.updateChargingGunById(chargingGun); //推送状态给三方平台 tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus()); } } @Override protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(UploadRealTimeMonitoringDataMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(UploadRealTimeMonitoringDataMessage message) { super.dispatchMessage(message); try { log.info("上传实时监测数据-业务消息处理:{}",message); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); // 查询订单 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); // 查询当前时间段的计费策略 TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); if (Objects.nonNull(data)) { uploadRealTimeMonitoringDataService.updateById(data.getId()); uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount())); uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree())); uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); }else { log.info("首次上传实时监测数据"); uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); } uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); if(i == 0){ log.error("数据存储mongo失败"); } // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); // 订单id ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); // 推送充电订单信息 ChargingMessage chargingMessage4 = new ChargingMessage(); chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage4.setOrderMessage(chargingOrderMessage3); enhanceProduce.orderInfoMessage(chargingMessage4); } catch (Exception e) { e.printStackTrace(); } } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java
@@ -19,62 +19,24 @@ messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_working_parameter_setting_reply", topic = "charge_working_parameter_setting_reply", selectorExpression = "working_parameter_setting_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 selectorExpression = "working_parameter_setting_reply" ) public class WorkingParameterSettingReplyMessageListener extends EnhanceMessageHandler<WorkingParameterSettingReplyMessage> implements RocketMQListener<WorkingParameterSettingReplyMessage> { public class WorkingParameterSettingReplyMessageListener implements RocketMQListener<WorkingParameterSettingReplyMessage> { @Autowired private WorkingParameterSettingReplyService workingParameterSettingReplyService; @Override protected void handleMessage(WorkingParameterSettingReplyMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 log.info("充电桩工作参数设置应答-业务消息处理:{}",message); // 持久化消息 WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); BeanUtils.copyProperties(message,workingParameterSettingReply); workingParameterSettingReplyService.create(workingParameterSettingReply); // 业务处理 } @Override protected void handleMaxRetriesExceeded(WorkingParameterSettingReplyMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(WorkingParameterSettingReplyMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(WorkingParameterSettingReplyMessage message) { super.dispatchMessage(message); log.info("充电桩工作参数设置应答-业务消息处理:{}",message); // 持久化消息 WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); BeanUtils.copyProperties(message,workingParameterSettingReply); workingParameterSettingReplyService.create(workingParameterSettingReply); } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -1,628 +1,625 @@ package com.ruoyi.integration.rocket.produce; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient; import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; import com.ruoyi.chargingPile.api.model.TChargingGun; import com.ruoyi.chargingPile.api.model.TFaultMessage; import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.common.redis.service.RedisService; import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.drainage.TCECPushUtil; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; import com.ruoyi.integration.mongodb.service.*; import com.ruoyi.integration.rocket.model.*; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import com.ruoyi.order.api.vo.SecurityDetectionVO; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.math.RoundingMode; import java.time.LocalDateTime; import java.util.Date; import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @Component @RocketMQMessageListener( messageModel = MessageModel.CLUSTERING, consumerGroup = "charge_charging_message", topic = "charge_charging_message", selectorExpression = "charging_message", consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { @Autowired private AcquisitionBillingModeService acquisitionBillingModeService; @Autowired private BillingModeVerifyService billingModeVerifyService; @Autowired private BmsAbortService bmsAbortService; @Resource private ChargingOrderClient chargingOrderClient; @Autowired private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; @Autowired private OnlineService onlineService; @Autowired private PingService pingService; @Autowired private EndChargeService endChargeService; @Autowired private ErrorMessageMessageService errorMessageMessageService; @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Autowired private ChargingHandshakeService chargingHandshakeService; @Autowired private ParameterSettingService parameterSettingService; @Autowired private MotorAbortService motorAbortService; @Autowired private BmsInformationService bmsInformationService; @Autowired private ChargingPileStartsChargingService chargingPileStartsChargingService; @Autowired private PlatformStartChargingReplyService platformStartChargingReplyService; @Autowired private PlatformStopChargingReplyService platformStopChargingReplyService; @Autowired private TransactionRecordService transactionRecordService; @Autowired private UpdateBalanceReplyService updateBalanceReplyService; @Autowired private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; @Autowired private ClearOfflineCardReplyService clearOfflineCardReplyService; @Autowired private WorkingParameterSettingReplyService workingParameterSettingReplyService; @Autowired private TimingSettingService timingSettingService; @Autowired private SetupBillingModelReplyService setupBillingModelReplyService; @Autowired private GroundLockRealTimeDataService groundLockRealTimeDataService; @Autowired private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; @Autowired private PlatformRestartReplyService platformRestartReplyService; @Autowired private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; @Autowired private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; @Autowired private SecurityDetectionService securityDetectionService; @Autowired private TCECPushUtil tcecPushUtil; @Resource private ChargingPileClient chargingPileClient; @Resource private ChargingGunClient chargingGunClient; @Resource private RedisTemplate redisTemplate; @Autowired private EnhanceProduce enhanceProduce; @StreamListener("input") @Override protected void handleMessage(ChargingMessage message) throws Exception { log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); log.info("充电桩登录认证业务消息处理:{}",onlineMessage); // 持久化消息 Online online = new Online(); BeanUtils.copyProperties(onlineMessage,online); onlineService.create(online); break; case SendTagConstant.PING: PingMessage pingMessage = message.getPingMessage(); log.info("充电桩心跳包-业务消息处理:{}",pingMessage); // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor.execute(new Runnable() { @Override public void run() { UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); } }); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); log.info("充电结束-业务消息处理:{}",endChargeMessage); // 持久化消息 EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor1.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 订单id String transactionSerialNumber = endCharge.getTransaction_serial_number(); ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); chargingOrderMessage.setOrderNumber(transactionSerialNumber); // 推送充电订单信息 ChargingMessage chargingMessage1 = new ChargingMessage(); chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); chargingMessage1.setOrderMessage(chargingOrderMessage); enhanceProduce.orderInfoMessage(chargingMessage1); // 推送充电订单状态 ChargingMessage chargingMessage2 = new ChargingMessage(); chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage2.setOrderMessage(chargingOrderMessage); enhanceProduce.orderStatusMessage(chargingMessage2); // try { // TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); // tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); // }catch (Exception e){ // e.printStackTrace(); // System.out.println("充电结束推送监管平台失败:"+e.getMessage()); //package com.ruoyi.integration.rocket.produce; // //import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSONObject; //import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; //import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; //import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; //import com.ruoyi.chargingPile.api.feignClient.FaultMessageClient; //import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; //import com.ruoyi.chargingPile.api.model.TChargingGun; //import com.ruoyi.chargingPile.api.model.TFaultMessage; //import com.ruoyi.chargingPile.api.vo.GetChargingGunByCode; //import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; //import com.ruoyi.common.redis.service.RedisService; //import com.ruoyi.integration.api.model.*; //import com.ruoyi.integration.drainage.TCECPushUtil; //import com.ruoyi.integration.iotda.constant.SendTagConstant; //import com.ruoyi.integration.iotda.enums.ServiceIdMenu; //import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; //import com.ruoyi.integration.mongodb.service.*; //import com.ruoyi.integration.rocket.model.*; //import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; //import com.ruoyi.order.api.feignClient.ChargingOrderClient; //import com.ruoyi.order.api.model.TChargingOrder; //import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; //import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; //import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; //import com.ruoyi.order.api.vo.SecurityDetectionVO; //import com.ruoyi.order.api.vo.TransactionRecordMessageVO; //import lombok.extern.slf4j.Slf4j; //import org.apache.rocketmq.spring.annotation.MessageModel; //import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; //import org.apache.rocketmq.spring.core.RocketMQListener; //import org.springframework.beans.BeanUtils; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.stereotype.Component; //import org.springframework.util.StringUtils; // //import javax.annotation.Resource; //import java.math.RoundingMode; //import java.time.LocalDateTime; //import java.util.Date; //import java.util.Objects; //import java.util.Set; //import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ThreadPoolExecutor; //import java.util.concurrent.TimeUnit; // //@Slf4j //@Component //@RocketMQMessageListener( // messageModel = MessageModel.CLUSTERING, // consumerGroup = "charge_charging_message", // topic = "charge_charging_message", // selectorExpression = "charging_message", // consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 //) //public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { // // @Autowired // private AcquisitionBillingModeService acquisitionBillingModeService; // @Autowired // private BillingModeVerifyService billingModeVerifyService; // @Autowired // private BmsAbortService bmsAbortService; // @Resource // private ChargingOrderClient chargingOrderClient; // @Autowired // private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; // @Autowired // private OnlineService onlineService; // @Autowired // private PingService pingService; // @Autowired // private EndChargeService endChargeService; // @Autowired // private ErrorMessageMessageService errorMessageMessageService; // @Autowired // private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; // @Resource // private AccountingStrategyDetailClient accountingStrategyDetailClient; // @Autowired // private ChargingHandshakeService chargingHandshakeService; // @Autowired // private ParameterSettingService parameterSettingService; // @Autowired // private MotorAbortService motorAbortService; // @Autowired // private BmsInformationService bmsInformationService; // @Autowired // private ChargingPileStartsChargingService chargingPileStartsChargingService; // @Autowired // private PlatformStartChargingReplyService platformStartChargingReplyService; // @Autowired // private PlatformStopChargingReplyService platformStopChargingReplyService; // @Autowired // private TransactionRecordService transactionRecordService; // @Autowired // private UpdateBalanceReplyService updateBalanceReplyService; // @Autowired // private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; // @Autowired // private ClearOfflineCardReplyService clearOfflineCardReplyService; // @Autowired // private WorkingParameterSettingReplyService workingParameterSettingReplyService; // @Autowired // private TimingSettingService timingSettingService; // @Autowired // private SetupBillingModelReplyService setupBillingModelReplyService; // @Autowired // private GroundLockRealTimeDataService groundLockRealTimeDataService; // @Autowired // private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; // @Autowired // private PlatformRestartReplyService platformRestartReplyService; // @Autowired // private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; // @Autowired // private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; // @Autowired // private SecurityDetectionService securityDetectionService; // @Autowired // private TCECPushUtil tcecPushUtil; // // @Resource // private ChargingPileClient chargingPileClient; // @Resource // private ChargingGunClient chargingGunClient; // // @Resource // private RedisTemplate redisTemplate; // // @Autowired // private EnhanceProduce enhanceProduce; // // // // @Override // protected void handleMessage(ChargingMessage message) throws Exception { // log.info("rocket收到的消息内容:{}",message); // String serviceId = message.getServiceId(); // if(!StringUtils.hasLength(serviceId)){ // return; // } // log.info("rocket收到的消息内容:{} {}", serviceId,message); // switch (serviceId){ // case SendTagConstant.ONLINE: // OnlineMessage onlineMessage = message.getOnlineMessage(); // log.info("充电桩登录认证业务消息处理:{}",onlineMessage); // // 持久化消息 // Online online = new Online(); // BeanUtils.copyProperties(onlineMessage,online); // onlineService.create(online); // break; // case SendTagConstant.PING: // PingMessage pingMessage = message.getPingMessage(); // log.info("充电桩心跳包-业务消息处理:{}",pingMessage); // // 持久化消息 // Ping ping = new Ping(); // BeanUtils.copyProperties(pingMessage,ping); // pingService.save(ping); // //存储缓存中,5分钟有效 // redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); // ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor.execute(new Runnable() { // @Override // public void run() { // UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); // vo1.setGun_code(pingMessage.getCharging_gun_code()); // vo1.setPile_code(pingMessage.getCharging_pile_code()); // vo1.setStatus(pingMessage.getCharging_gun_status()); // chargingPileClient.updateChargingPileStatus(vo1); // } // }); // break; // case SendTagConstant.END_CHARGE: // EndChargeMessage endChargeMessage = message.getEndChargeMessage(); // log.info("充电结束-业务消息处理:{}",endChargeMessage); // // 持久化消息 // EndCharge endCharge = new EndCharge(); // BeanUtils.copyProperties(endChargeMessage,endCharge); // endChargeService.create(endCharge); // ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor1.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // // 订单id // String transactionSerialNumber = endCharge.getTransaction_serial_number(); // ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); // chargingOrderMessage.setOrderNumber(transactionSerialNumber); // // 推送充电订单信息 // ChargingMessage chargingMessage1 = new ChargingMessage(); // chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); // chargingMessage1.setOrderMessage(chargingOrderMessage); // enhanceProduce.orderInfoMessage(chargingMessage1); // // 推送充电订单状态 // ChargingMessage chargingMessage2 = new ChargingMessage(); // chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); // chargingMessage2.setOrderMessage(chargingOrderMessage); // enhanceProduce.orderStatusMessage(chargingMessage2); //// try { //// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); //// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); //// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); //// }catch (Exception e){ //// e.printStackTrace(); //// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); //// } // } // }); // break; // case SendTagConstant.ERROR_MESSAGE: // ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); // log.info("错误报文-业务消息处理:{}",errorMessageMessage1); // // 持久化消息 // ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); // BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); // errorMessageMessageService.create(errorMessageMessage); // break; // case SendTagConstant.BILLING_MODE_VERIFY: // BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); // log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); // // 持久化消息 // BillingModeVerify billingModeVerify = new BillingModeVerify(); // BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); // billingModeVerifyService.create(billingModeVerify); // break; // case SendTagConstant.ACQUISITION_BILLING_MODE: // AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); // log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); // // 持久化消息 // AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); // BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); // acquisitionBillingModeService.create(acquisitionBillingMode); // break; // case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: // try { // UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); // log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // // 持久化消息 // UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); // BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); // // 查询mogondb上一条数据 // UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); // // 查询订单 // TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); // // 查询当前时间段的计费策略 // TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); // uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); // uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); // if (Objects.nonNull(data)) { // uploadRealTimeMonitoringDataService.updateById(data.getId()); // uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); // uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); // uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); // }else { // log.info("首次上传实时监测数据"); // uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); // uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); // uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); // } // uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); // uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); // uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); //// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); //// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); // int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); // if(i == 0){ // log.error("数据存储mongo失败"); // } // // ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor2.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); // BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); // chargingOrderClient.chargeMonitoring(query); // chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); // ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); // chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); // chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); // // 推送充电订单信息 // ChargingMessage chargingMessage4 = new ChargingMessage(); // chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); // chargingMessage4.setOrderMessage(chargingOrderMessage3); // enhanceProduce.orderInfoMessage(chargingMessage4); //// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); // } } }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); log.info("错误报文-业务消息处理:{}",errorMessageMessage1); // 持久化消息 ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); errorMessageMessageService.create(errorMessageMessage); break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); // 持久化消息 BillingModeVerify billingModeVerify = new BillingModeVerify(); BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); billingModeVerifyService.create(billingModeVerify); break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); // 持久化消息 AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); acquisitionBillingModeService.create(acquisitionBillingMode); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: try { UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); // 查询订单 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); // 查询当前时间段的计费策略 TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); if (Objects.nonNull(data)) { uploadRealTimeMonitoringDataService.updateById(data.getId()); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); }else { log.info("首次上传实时监测数据"); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); } uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); // uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); // uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); if(i == 0){ log.error("数据存储mongo失败"); } ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor2.execute(new Runnable() { @Override public void run() { // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); chargingOrderMessage3.setSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); // 推送充电订单信息 ChargingMessage chargingMessage4 = new ChargingMessage(); chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage4.setOrderMessage(chargingOrderMessage3); enhanceProduce.orderInfoMessage(chargingMessage4); // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); } }); } catch (Exception e) { e.printStackTrace(); } break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); // 持久化消息 ChargingHandshake chargingHandshake = new ChargingHandshake(); BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); chargingHandshakeService.create(chargingHandshake); break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); log.info("业务消息处理:{}",parameterSettingMessage); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); parameterSettingService.create(parameterSetting); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); // 持久化消息 BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor3.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); // 持久化消息 MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor4.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); // 持久化消息 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor5.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } } }); break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); // 持久化消息 BmsInformation bmsInformation = new BmsInformation(); BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); bmsInformationService.create(bmsInformation); break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); // 持久化消息 ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); chargingPileStartsChargingService.create(chargingPileStartsCharging); break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); // 持久化消息 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor6.execute(new Runnable() { @Override public void run() { // 业务处理 PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); chargingOrderClient.startChargeSuccessfully(message1); } }); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); // 持久化消息 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor7.execute(new Runnable() { @Override public void run() { PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); } }); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); log.info("交易记录-业务消息处理:{}",transactionRecordMessage); transactionRecordMessage.setResult(JSONObject.toJSONString(message)); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor8.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecordMessage,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } } }); // 添加实时上传记录结束记录 // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); if(Objects.nonNull(data) && data.getStatus() != 5){ UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); uploadRealTimeMonitoringData.setStatus(5); uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); } break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); // 持久化消息 UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); updateBalanceReplyService.create(updateBalanceReply); break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); // 持久化消息 SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); // 持久化消息 ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); clearOfflineCardReplyService.create(clearOfflineCardReply); break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); // 持久化消息 WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); workingParameterSettingReplyService.create(workingParameterSettingReply); break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); log.info("对时设置-业务消息处理:{}",timingSettingMessage); // 持久化消息 TimingSetting timingSetting = new TimingSetting(); BeanUtils.copyProperties(timingSettingMessage,timingSetting); timingSettingService.create(timingSetting); break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); // 持久化消息 SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); setupBillingModelReplyService.create(setupBillingModelReply); break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); // 持久化消息 GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); groundLockRealTimeDataService.create(groundLockRealTimeData); break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); // 持久化消息 ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); // 持久化消息 PlatformRestartReply platformRestartReply = new PlatformRestartReply(); BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); platformRestartReplyService.create(platformRestartReply); break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); log.info("安全监测-业务消息处理:{}",securityDetectionMessage); SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor9.execute(new Runnable() { @Override public void run() { SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); } }); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); break; } } @Override protected void handleMaxRetriesExceeded(ChargingMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } /** * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ @Override protected boolean filter(ChargingMessage message) { // 此处可做消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(ChargingMessage message) { super.dispatchMessage(message); } /** * 处理未正常完成费用计算的订单 */ public void transactionRecord(){ Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD); for (String member : members) { TransactionRecord transactionRecord = transactionRecordService.findOne(member); if(null == transactionRecord){ redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); }else{ TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecord, vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 == code){ redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); } } } } } // }); // } catch (Exception e) { // e.printStackTrace(); // } // break; // case SendTagConstant.CHARGING_HANDSHAKE: // ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); // log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); // // 持久化消息 // ChargingHandshake chargingHandshake = new ChargingHandshake(); // BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); // chargingHandshakeService.create(chargingHandshake); // break; // case SendTagConstant.PARAMETER_SETTING: // ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); // log.info("业务消息处理:{}",parameterSettingMessage); // // 持久化消息 // ParameterSetting parameterSetting = new ParameterSetting(); // BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); // parameterSettingService.create(parameterSetting); // break; // case SendTagConstant.BMS_ABORT: // BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); // log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); // // 持久化消息 // BmsAbort bmsAbort = new BmsAbort(); // BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); // bmsAbortService.create(bmsAbort); // // ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor3.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); // } // }); // break; // case SendTagConstant.MOTOR_ABORT: // MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); // log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); // // 持久化消息 // MotorAbort motorAbort = new MotorAbort(); // BeanUtils.copyProperties(motorAbortMessage,motorAbort); // motorAbortService.create(motorAbort); // ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor4.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); // } // }); // break; // case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: // BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); // log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); // // 持久化消息 // BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); // BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); // bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor5.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); // if(Objects.nonNull(chargingOrderBms)){ // chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); // chargingOrderClient.updateChargingOrder(chargingOrderBms); // } // } // }); // break; // case SendTagConstant.BMS_INFORMATION: // BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); // log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); // // 持久化消息 // BmsInformation bmsInformation = new BmsInformation(); // BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); // bmsInformationService.create(bmsInformation); // break; // case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: // ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); // log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); // // 持久化消息 // ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); // BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); // chargingPileStartsChargingService.create(chargingPileStartsCharging); // break; // case SendTagConstant.PLATFORM_START_CHARGING_REPLY: // PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); // log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); // // 持久化消息 // PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); // BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); // platformStartChargingReplyService.create(platformStartChargingReply); // ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor6.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); // BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); // chargingOrderClient.startChargeSuccessfully(message1); // } // }); // break; // case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: // PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); // log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); // // 持久化消息 // PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); // BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); // platformStopChargingReplyService.create(platformStopChargingReply); // ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor7.execute(new Runnable() { // @Override // public void run() { // PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); // BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); // chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); // } // }); // break; // case SendTagConstant.TRANSACTION_RECORD: // TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); // log.info("交易记录-业务消息处理:{}",transactionRecordMessage); // transactionRecordMessage.setResult(JSONObject.toJSONString(message)); // // 持久化消息 // TransactionRecord transactionRecord = new TransactionRecord(); // BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); // transactionRecordService.create(transactionRecord); // ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor8.execute(new Runnable() { // @Override // public void run() { // // 业务处理 // TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); // if(Objects.nonNull(chargingOrderRecord)){ // chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); // chargingOrderClient.updateChargingOrder(chargingOrderRecord); // } // //计算费用 // TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); // BeanUtils.copyProperties(transactionRecordMessage,vo); // int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); // if(200 != code){ // //失败后添加到队列中继续处理数据 // redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); // } // } // }); // // // // 添加实时上传记录结束记录 // // 查询mogondb上一条数据 // UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); // if(Objects.nonNull(data) && data.getStatus() != 5){ // UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); // BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); // uploadRealTimeMonitoringData.setStatus(5); // uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); // } // break; // case SendTagConstant.UPDATE_BALANCE_REPLY: // UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); // log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); // // 持久化消息 // UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); // BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); // updateBalanceReplyService.create(updateBalanceReply); // break; // case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: // SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); // log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); // // 持久化消息 // SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); // BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); // synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); // break; // case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: // ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); // log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); // // 持久化消息 // ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); // BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); // clearOfflineCardReplyService.create(clearOfflineCardReply); // break; // case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: // WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); // log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); // // 持久化消息 // WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); // BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); // workingParameterSettingReplyService.create(workingParameterSettingReply); // break; // case SendTagConstant.TIMING_SETTING: // TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); // log.info("对时设置-业务消息处理:{}",timingSettingMessage); // // 持久化消息 // TimingSetting timingSetting = new TimingSetting(); // BeanUtils.copyProperties(timingSettingMessage,timingSetting); // timingSettingService.create(timingSetting); // break; // case SendTagConstant.SETUP_BILLING_MODEL_REPLY: // SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); // log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); // // 持久化消息 // SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); // BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); // setupBillingModelReplyService.create(setupBillingModelReply); // break; // case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: // GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); // log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); // // 持久化消息 // GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); // BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); // groundLockRealTimeDataService.create(groundLockRealTimeData); // break; // case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: // ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); // log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); // // 持久化消息 // ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); // BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); // chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); // break; // case SendTagConstant.PLATFORM_RESTART_REPLY: // PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); // log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); // // 持久化消息 // PlatformRestartReply platformRestartReply = new PlatformRestartReply(); // BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); // platformRestartReplyService.create(platformRestartReply); // break; // case SendTagConstant.QR_CODE_DELIVERY_REPLY: // QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); // log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); // QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); // BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); // qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); // break; // case SendTagConstant.SECURITY_DETECTION: // SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); // log.info("安全监测-业务消息处理:{}",securityDetectionMessage); // SecurityDetection securityDetection = new SecurityDetection(); // BeanUtils.copyProperties(securityDetectionMessage,securityDetection); // securityDetectionService.create(securityDetection); // ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor9.execute(new Runnable() { // @Override // public void run() { // SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); // BeanUtils.copyProperties(securityDetection, securityDetection1); // chargingOrderClient.securityDetection(securityDetection1); // } // }); // break; // default: // PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); // log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); // PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); // BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); // platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); // break; // } // } // // @Override // protected void handleMaxRetriesExceeded(ChargingMessage message) { // // 当超过指定重试次数消息时此处方法会被调用 // // 生产中可以进行回退或其他业务操作 // log.error("消息消费失败,请执行后续处理"); // } // // // /** // * 是否执行重试机制 // */ // @Override // protected boolean isRetry() { // return true; // } // // @Override // protected boolean throwException() { // // 是否抛出异常,false搭配retry自行处理异常 // return false; // } // // /** // * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 // * @param message 待处理消息 // * @return true: 本次消息被过滤,false:不过滤 // */ // @Override // protected boolean filter(ChargingMessage message) { // // 此处可做消息过滤 // return false; // } // // /** // * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 // */ // @Override // public void onMessage(ChargingMessage message) { // super.dispatchMessage(message); // } // // // /** // * 处理未正常完成费用计算的订单 // */ // public void transactionRecord(){ // Set<String> members = redisTemplate.opsForSet().members(SendTagConstant.TRANSACTION_RECORD); // for (String member : members) { // TransactionRecord transactionRecord = transactionRecordService.findOne(member); // if(null == transactionRecord){ // redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); // }else{ // TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); // BeanUtils.copyProperties(transactionRecord, vo); // int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); // if(200 == code){ // redisTemplate.opsForSet().remove(SendTagConstant.TRANSACTION_RECORD, member); // } // } // } // } //} ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
@@ -1,527 +1,522 @@ package com.ruoyi.integration.rocket.produce; import com.alibaba.fastjson.JSONObject; import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; import com.ruoyi.chargingPile.api.model.TChargingGun; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.drainage.TCECPushUtil; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.*; import com.ruoyi.integration.rocket.model.*; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import com.ruoyi.order.api.vo.SecurityDetectionVO; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.math.RoundingMode; import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @Component public class ChargingMessageUtil { @Autowired private AcquisitionBillingModeService acquisitionBillingModeService; @Autowired private BillingModeVerifyService billingModeVerifyService; @Autowired private BmsAbortService bmsAbortService; @Resource private ChargingOrderClient chargingOrderClient; @Autowired private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; @Autowired private OnlineService onlineService; @Autowired private PingService pingService; @Autowired private EndChargeService endChargeService; @Autowired private ErrorMessageMessageService errorMessageMessageService; @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Autowired private ChargingHandshakeService chargingHandshakeService; @Autowired private ParameterSettingService parameterSettingService; @Autowired private MotorAbortService motorAbortService; @Autowired private BmsInformationService bmsInformationService; @Autowired private ChargingPileStartsChargingService chargingPileStartsChargingService; @Autowired private PlatformStartChargingReplyService platformStartChargingReplyService; @Autowired private PlatformStopChargingReplyService platformStopChargingReplyService; @Autowired private TransactionRecordService transactionRecordService; @Autowired private UpdateBalanceReplyService updateBalanceReplyService; @Autowired private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; @Autowired private ClearOfflineCardReplyService clearOfflineCardReplyService; @Autowired private WorkingParameterSettingReplyService workingParameterSettingReplyService; @Autowired private TimingSettingService timingSettingService; @Autowired private SetupBillingModelReplyService setupBillingModelReplyService; @Autowired private GroundLockRealTimeDataService groundLockRealTimeDataService; @Autowired private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; @Autowired private PlatformRestartReplyService platformRestartReplyService; @Autowired private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; @Autowired private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; @Autowired private SecurityDetectionService securityDetectionService; @Autowired private TCECPushUtil tcecPushUtil; @Resource private ChargingPileClient chargingPileClient; @Resource private ChargingGunClient chargingGunClient; @Resource private RedisTemplate redisTemplate; @Autowired private EnhanceProduce enhanceProduce; public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); log.info("充电桩登录认证业务消息处理:{}",onlineMessage); // 持久化消息 Online online = new Online(); BeanUtils.copyProperties(onlineMessage,online); onlineService.create(online); break; case SendTagConstant.PING: PingMessage pingMessage = message.getPingMessage(); log.info("充电桩心跳包-业务消息处理:{}",pingMessage); // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); // 监管平台推送充电设备状态 SendResult sendResult; String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); ChargingMessage chargingMessage = new ChargingMessage(); chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); GunStatusMessage gunStatusMessage = new GunStatusMessage(); gunStatusMessage.setFullNumber(gunCode); chargingMessage.setGunStatusMessage(gunStatusMessage); sendResult = enhanceProduce.gunStatusMessage(chargingMessage); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); log.info("充电结束-业务消息处理:{}",endChargeMessage); // 持久化消息 EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 订单id String transactionSerialNumber = endCharge.getTransaction_serial_number(); ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); chargingOrderMessage.setOrderNumber(transactionSerialNumber); // 推送充电订单信息 ChargingMessage chargingMessage1 = new ChargingMessage(); chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); chargingMessage1.setOrderMessage(chargingOrderMessage); enhanceProduce.orderInfoMessage(chargingMessage1); // 推送充电订单状态 ChargingMessage chargingMessage2 = new ChargingMessage(); chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage2.setOrderMessage(chargingOrderMessage); enhanceProduce.orderStatusMessage(chargingMessage2); // ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor1.execute(new Runnable() { //package com.ruoyi.integration.rocket.produce; // //import com.alibaba.fastjson.JSONObject; //import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; //import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; //import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; //import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; //import com.ruoyi.chargingPile.api.model.TChargingGun; //import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; //import com.ruoyi.integration.api.model.*; //import com.ruoyi.integration.drainage.TCECPushUtil; //import com.ruoyi.integration.iotda.constant.SendTagConstant; //import com.ruoyi.integration.mongodb.service.*; //import com.ruoyi.integration.rocket.model.*; //import com.ruoyi.order.api.feignClient.ChargingOrderClient; //import com.ruoyi.order.api.model.TChargingOrder; //import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; //import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; //import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; //import com.ruoyi.order.api.vo.SecurityDetectionVO; //import com.ruoyi.order.api.vo.TransactionRecordMessageVO; //import lombok.extern.slf4j.Slf4j; //import org.apache.rocketmq.client.producer.SendResult; //import org.springframework.beans.BeanUtils; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.stereotype.Component; //import org.springframework.util.StringUtils; // //import javax.annotation.Resource; //import java.math.RoundingMode; //import java.util.Objects; //import java.util.Set; //import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ThreadPoolExecutor; //import java.util.concurrent.TimeUnit; // // //@Slf4j //@Component //public class ChargingMessageUtil { // // @Autowired // private AcquisitionBillingModeService acquisitionBillingModeService; // @Autowired // private BillingModeVerifyService billingModeVerifyService; // @Autowired // private BmsAbortService bmsAbortService; // @Resource // private ChargingOrderClient chargingOrderClient; // @Autowired // private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; // @Autowired // private OnlineService onlineService; // @Autowired // private PingService pingService; // @Autowired // private EndChargeService endChargeService; // @Autowired // private ErrorMessageMessageService errorMessageMessageService; // @Autowired // private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; // @Resource // private AccountingStrategyDetailClient accountingStrategyDetailClient; // @Autowired // private ChargingHandshakeService chargingHandshakeService; // @Autowired // private ParameterSettingService parameterSettingService; // @Autowired // private MotorAbortService motorAbortService; // @Autowired // private BmsInformationService bmsInformationService; // @Autowired // private ChargingPileStartsChargingService chargingPileStartsChargingService; // @Autowired // private PlatformStartChargingReplyService platformStartChargingReplyService; // @Autowired // private PlatformStopChargingReplyService platformStopChargingReplyService; // @Autowired // private TransactionRecordService transactionRecordService; // @Autowired // private UpdateBalanceReplyService updateBalanceReplyService; // @Autowired // private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; // @Autowired // private ClearOfflineCardReplyService clearOfflineCardReplyService; // @Autowired // private WorkingParameterSettingReplyService workingParameterSettingReplyService; // @Autowired // private TimingSettingService timingSettingService; // @Autowired // private SetupBillingModelReplyService setupBillingModelReplyService; // @Autowired // private GroundLockRealTimeDataService groundLockRealTimeDataService; // @Autowired // private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; // @Autowired // private PlatformRestartReplyService platformRestartReplyService; // @Autowired // private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; // @Autowired // private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; // @Autowired // private SecurityDetectionService securityDetectionService; // @Autowired // private TCECPushUtil tcecPushUtil; // // @Resource // private ChargingPileClient chargingPileClient; // @Resource // private ChargingGunClient chargingGunClient; // // @Resource // private RedisTemplate redisTemplate; // @Autowired // private EnhanceProduce enhanceProduce; // // // // // // public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ // log.info("rocket收到的消息内容:{}",message); // String serviceId = message.getServiceId(); // if(!StringUtils.hasLength(serviceId)){ // return; // } // log.info("rocket收到的消息内容:{} {}", serviceId,message); // switch (serviceId){ // case SendTagConstant.ONLINE: // OnlineMessage onlineMessage = message.getOnlineMessage(); // log.info("充电桩登录认证业务消息处理:{}",onlineMessage); // // 持久化消息 // Online online = new Online(); // BeanUtils.copyProperties(onlineMessage,online); // onlineService.create(online); // break; // case SendTagConstant.PING: // PingMessage pingMessage = message.getPingMessage(); // log.info("充电桩心跳包-业务消息处理:{}",pingMessage); // // 持久化消息 // Ping ping = new Ping(); // BeanUtils.copyProperties(pingMessage,ping); // pingService.save(ping); // //存储缓存中,5分钟有效 // redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); // // UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); // vo1.setGun_code(pingMessage.getCharging_gun_code()); // vo1.setPile_code(pingMessage.getCharging_pile_code()); // vo1.setStatus(pingMessage.getCharging_gun_status()); // chargingPileClient.updateChargingPileStatus(vo1); // // 监管平台推送充电设备状态 // SendResult sendResult; // String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); // ChargingMessage chargingMessage = new ChargingMessage(); // chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); // GunStatusMessage gunStatusMessage = new GunStatusMessage(); // gunStatusMessage.setFullNumber(gunCode); // chargingMessage.setGunStatusMessage(gunStatusMessage); // sendResult = enhanceProduce.gunStatusMessage(chargingMessage); // // break; // case SendTagConstant.END_CHARGE: // EndChargeMessage endChargeMessage = message.getEndChargeMessage(); // log.info("充电结束-业务消息处理:{}",endChargeMessage); // // 持久化消息 // EndCharge endCharge = new EndCharge(); // BeanUtils.copyProperties(endChargeMessage,endCharge); // endChargeService.create(endCharge); // // 业务处理 // chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // // 订单id // String transactionSerialNumber = endCharge.getTransaction_serial_number(); // ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); // chargingOrderMessage.setOrderNumber(transactionSerialNumber); // // 推送充电订单信息 // ChargingMessage chargingMessage1 = new ChargingMessage(); // chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); // chargingMessage1.setOrderMessage(chargingOrderMessage); // enhanceProduce.orderInfoMessage(chargingMessage1); // // 推送充电订单状态 // ChargingMessage chargingMessage2 = new ChargingMessage(); // chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); // chargingMessage2.setOrderMessage(chargingOrderMessage); // enhanceProduce.orderStatusMessage(chargingMessage2); //// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); //// threadPoolExecutor1.execute(new Runnable() { //// @Override //// public void run() { //// try { //// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); //// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); //// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); //// }catch (Exception e){ //// e.printStackTrace(); //// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); //// } //// } //// }); // break; // case SendTagConstant.ERROR_MESSAGE: // ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); // log.info("错误报文-业务消息处理:{}",errorMessageMessage1); // // 持久化消息 // ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); // BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); // errorMessageMessageService.create(errorMessageMessage); // break; // case SendTagConstant.BILLING_MODE_VERIFY: // BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); // log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); // // 持久化消息 // BillingModeVerify billingModeVerify = new BillingModeVerify(); // BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); // billingModeVerifyService.create(billingModeVerify); // break; // case SendTagConstant.ACQUISITION_BILLING_MODE: // AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); // log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); // // 持久化消息 // AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); // BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); // acquisitionBillingModeService.create(acquisitionBillingMode); // break; // case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: // try { // UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); // log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // // 持久化消息 // UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); // BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); // // 查询mogondb上一条数据 // UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); // // 查询订单 // TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); // // 查询当前时间段的计费策略 // TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); // uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); // uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); // if (Objects.nonNull(data)) { // uploadRealTimeMonitoringDataService.updateById(data.getId()); // uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); // uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); // uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); // }else { // log.info("首次上传实时监测数据"); // uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); // uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); // uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); // } // uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); // uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); // uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); //// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); //// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); // int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); // if(i == 0){ // log.error("数据存储mongo失败"); // } // // // 业务处理 // UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); // BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); // chargingOrderClient.chargeMonitoring(query); // // 订单id // ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); // chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); // // 推送充电订单信息 // ChargingMessage chargingMessage4 = new ChargingMessage(); // chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); // chargingMessage4.setOrderMessage(chargingOrderMessage3); // enhanceProduce.orderInfoMessage(chargingMessage4); //// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); //// threadPoolExecutor2.execute(new Runnable() { //// @Override //// public void run() { //// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); //// //// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); //// } //// }); // } catch (Exception e) { // e.printStackTrace(); // } // break; // case SendTagConstant.CHARGING_HANDSHAKE: // ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); // log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); // // 持久化消息 // ChargingHandshake chargingHandshake = new ChargingHandshake(); // BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); // chargingHandshakeService.create(chargingHandshake); // break; // case SendTagConstant.PARAMETER_SETTING: // ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); // log.info("业务消息处理:{}",parameterSettingMessage); // // 持久化消息 // ParameterSetting parameterSetting = new ParameterSetting(); // BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); // parameterSettingService.create(parameterSetting); // break; // case SendTagConstant.BMS_ABORT: // BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); // log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); // // 持久化消息 // BmsAbort bmsAbort = new BmsAbort(); // BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); // bmsAbortService.create(bmsAbort); // // ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor3.execute(new Runnable() { // @Override // public void run() { // try { // TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); // tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); // }catch (Exception e){ // e.printStackTrace(); // System.out.println("充电结束推送监管平台失败:"+e.getMessage()); // } // // 业务处理 // chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); // } // }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); log.info("错误报文-业务消息处理:{}",errorMessageMessage1); // 持久化消息 ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); errorMessageMessageService.create(errorMessageMessage); break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); // 持久化消息 BillingModeVerify billingModeVerify = new BillingModeVerify(); BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); billingModeVerifyService.create(billingModeVerify); break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); // 持久化消息 AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); acquisitionBillingModeService.create(acquisitionBillingMode); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: try { UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); // 查询订单 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); // 查询当前时间段的计费策略 TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); if (Objects.nonNull(data)) { uploadRealTimeMonitoringDataService.updateById(data.getId()); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); }else { log.info("首次上传实时监测数据"); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); } uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); // uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); // uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); if(i == 0){ log.error("数据存储mongo失败"); } // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); // 订单id ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); // 推送充电订单信息 ChargingMessage chargingMessage4 = new ChargingMessage(); chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); chargingMessage4.setOrderMessage(chargingOrderMessage3); enhanceProduce.orderInfoMessage(chargingMessage4); // ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // threadPoolExecutor2.execute(new Runnable() { // @Override // public void run() { // chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); // break; // case SendTagConstant.MOTOR_ABORT: // MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); // log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); // // 持久化消息 // MotorAbort motorAbort = new MotorAbort(); // BeanUtils.copyProperties(motorAbortMessage,motorAbort); // motorAbortService.create(motorAbort); // // 业务处理 // chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); // break; // case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: // BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); // log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); // // 持久化消息 // BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); // BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); // bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); // } // }); } catch (Exception e) { e.printStackTrace(); } break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); // 持久化消息 ChargingHandshake chargingHandshake = new ChargingHandshake(); BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); chargingHandshakeService.create(chargingHandshake); break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); log.info("业务消息处理:{}",parameterSettingMessage); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); parameterSettingService.create(parameterSetting); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); // 持久化消息 BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor3.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); // 持久化消息 MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); // 持久化消息 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); // 持久化消息 BmsInformation bmsInformation = new BmsInformation(); BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); bmsInformationService.create(bmsInformation); break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); // 持久化消息 ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); chargingPileStartsChargingService.create(chargingPileStartsCharging); break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); // 持久化消息 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); // 业务处理 PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); chargingOrderClient.startChargeSuccessfully(message1); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); // 持久化消息 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); log.info("交易记录-业务消息处理:{}",transactionRecordMessage); transactionRecordMessage.setResult(JSONObject.toJSONString(message)); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecordMessage,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } // 添加实时上传记录结束记录 // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); if(Objects.nonNull(data) && data.getStatus() != 5){ UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); uploadRealTimeMonitoringData.setStatus(5); uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); } break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); // 持久化消息 UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); updateBalanceReplyService.create(updateBalanceReply); break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); // 持久化消息 SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); // 持久化消息 ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); clearOfflineCardReplyService.create(clearOfflineCardReply); break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); // 持久化消息 WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); workingParameterSettingReplyService.create(workingParameterSettingReply); break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); log.info("对时设置-业务消息处理:{}",timingSettingMessage); // 持久化消息 TimingSetting timingSetting = new TimingSetting(); BeanUtils.copyProperties(timingSettingMessage,timingSetting); timingSettingService.create(timingSetting); break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); // 持久化消息 SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); setupBillingModelReplyService.create(setupBillingModelReply); break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); // 持久化消息 GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); groundLockRealTimeDataService.create(groundLockRealTimeData); break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); // 持久化消息 ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); // 持久化消息 PlatformRestartReply platformRestartReply = new PlatformRestartReply(); BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); platformRestartReplyService.create(platformRestartReply); break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); log.info("安全监测-业务消息处理:{}",securityDetectionMessage); SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); break; } } } // // 业务处理 // TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); // if(Objects.nonNull(chargingOrderBms)){ // chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); // chargingOrderClient.updateChargingOrder(chargingOrderBms); // } // break; // case SendTagConstant.BMS_INFORMATION: // BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); // log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); // // 持久化消息 // BmsInformation bmsInformation = new BmsInformation(); // BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); // bmsInformationService.create(bmsInformation); // break; // case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: // ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); // log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); // // 持久化消息 // ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); // BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); // chargingPileStartsChargingService.create(chargingPileStartsCharging); // break; // case SendTagConstant.PLATFORM_START_CHARGING_REPLY: // PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); // log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); // // 持久化消息 // PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); // BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); // platformStartChargingReplyService.create(platformStartChargingReply); // // // 业务处理 // PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); // BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); // chargingOrderClient.startChargeSuccessfully(message1); // break; // case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: // PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); // log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); // // 持久化消息 // PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); // BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); // platformStopChargingReplyService.create(platformStopChargingReply); // // PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); // BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); // chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); // break; // case SendTagConstant.TRANSACTION_RECORD: // TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); // log.info("交易记录-业务消息处理:{}",transactionRecordMessage); // transactionRecordMessage.setResult(JSONObject.toJSONString(message)); // // 持久化消息 // TransactionRecord transactionRecord = new TransactionRecord(); // BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); // transactionRecordService.create(transactionRecord); // // // 业务处理 // TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); // if(Objects.nonNull(chargingOrderRecord)){ // chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); // chargingOrderClient.updateChargingOrder(chargingOrderRecord); // } // //计算费用 // TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); // BeanUtils.copyProperties(transactionRecordMessage,vo); // int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); // if(200 != code){ // //失败后添加到队列中继续处理数据 // redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); // } // // // // 添加实时上传记录结束记录 // // 查询mogondb上一条数据 // UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); // if(Objects.nonNull(data) && data.getStatus() != 5){ // UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); // BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); // uploadRealTimeMonitoringData.setStatus(5); // uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); // } // break; // case SendTagConstant.UPDATE_BALANCE_REPLY: // UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); // log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); // // 持久化消息 // UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); // BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); // updateBalanceReplyService.create(updateBalanceReply); // break; // case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: // SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); // log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); // // 持久化消息 // SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); // BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); // synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); // break; // case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: // ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); // log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); // // 持久化消息 // ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); // BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); // clearOfflineCardReplyService.create(clearOfflineCardReply); // break; // case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: // WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); // log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); // // 持久化消息 // WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); // BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); // workingParameterSettingReplyService.create(workingParameterSettingReply); // break; // case SendTagConstant.TIMING_SETTING: // TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); // log.info("对时设置-业务消息处理:{}",timingSettingMessage); // // 持久化消息 // TimingSetting timingSetting = new TimingSetting(); // BeanUtils.copyProperties(timingSettingMessage,timingSetting); // timingSettingService.create(timingSetting); // break; // case SendTagConstant.SETUP_BILLING_MODEL_REPLY: // SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); // log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); // // 持久化消息 // SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); // BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); // setupBillingModelReplyService.create(setupBillingModelReply); // break; // case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: // GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); // log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); // // 持久化消息 // GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); // BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); // groundLockRealTimeDataService.create(groundLockRealTimeData); // break; // case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: // ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); // log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); // // 持久化消息 // ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); // BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); // chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); // break; // case SendTagConstant.PLATFORM_RESTART_REPLY: // PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); // log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); // // 持久化消息 // PlatformRestartReply platformRestartReply = new PlatformRestartReply(); // BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); // platformRestartReplyService.create(platformRestartReply); // break; // case SendTagConstant.QR_CODE_DELIVERY_REPLY: // QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); // log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); // QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); // BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); // qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); // break; // case SendTagConstant.SECURITY_DETECTION: // SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); // log.info("安全监测-业务消息处理:{}",securityDetectionMessage); // SecurityDetection securityDetection = new SecurityDetection(); // BeanUtils.copyProperties(securityDetectionMessage,securityDetection); // securityDetectionService.create(securityDetection); // // SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); // BeanUtils.copyProperties(securityDetection, securityDetection1); // chargingOrderClient.securityDetection(securityDetection1); // break; // default: // PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); // log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); // PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); // BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); // platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); // break; // } // } // // //} ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java
@@ -64,7 +64,7 @@ Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TAGS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("[{}]同步消息[{}]---->发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } ruoyi-service/ruoyi-integration/src/test/java/com/ruoyi/integration/RuoYiIntegrationApplicationTests.java
@@ -30,10 +30,12 @@ import com.ruoyi.other.api.domain.Operator; import com.ruoyi.other.api.feignClient.OperatorClient; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cglib.core.Local; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; import javax.annotation.Resource; @@ -50,395 +52,5 @@ @Slf4j @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RuoYiIntegrationApplication.class) public class RuoYiIntegrationApplicationTests { @Resource private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Resource private ParkingOrderService parkingOrderService; @Resource private SiteClient siteClient; @Resource private ChargingGunClient chargingGunClient; @Resource private ChargingOrderClient chargingOrderClient; @Resource private OperatorClient operatorClient; @Test public void test(){ /** * 推送充电设备接口状态信息 * @param chargingGun * @return */ List<TChargingGun> data = chargingGunClient.getAllGun().getData(); TChargingGun chargingGun = data.stream().filter(e -> e.getId() == 61).findFirst().orElse(new TChargingGun()); ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo(); connectorStatusInfo.setOperatorID("906171535"); connectorStatusInfo.setEquipmentOwnerID("906171535"); connectorStatusInfo.setStationID(String.valueOf(chargingGun.getSiteId())); connectorStatusInfo.setEquipmentID(String.valueOf(chargingGun.getChargingPileId())); connectorStatusInfo.setConnectorID(chargingGun.getFullNumber()); connectorStatusInfo.setEquipmentClassification(1); switch (chargingGun.getStatus()){ case 1: connectorStatusInfo.setStatus(0); break; case 2: connectorStatusInfo.setStatus(1); break; case 3: connectorStatusInfo.setStatus(2); break; case 4: connectorStatusInfo.setStatus(3); break; case 5: connectorStatusInfo.setStatus(3); break; case 6: connectorStatusInfo.setStatus(4); break; case 7: connectorStatusInfo.setStatus(255); break; } connectorStatusInfo.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // List<Operator> operators = operatorClient.getAllOperator().getData(); // for (Operator operator : operators) { tcecSuperviseUtil.notificationStationStatus(new Operator(), connectorStatusInfo); // } } @Autowired private TCECSuperviseUtil tcecSuperviseUtil; private final static String operatorId = "906171535"; @Test public void test1(){ TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData(); SupEquipChargeStatus supEquipChargeStatus = new SupEquipChargeStatus(); supEquipChargeStatus.setOperatorID(operatorId); supEquipChargeStatus.setEquipmentOwnerID(operatorId); supEquipChargeStatus.setStationID(String.valueOf(chargingOrder.getSiteId())); supEquipChargeStatus.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId())); supEquipChargeStatus.setOrderNo(operatorId+chargingOrder.getCode()); switch (chargingOrder.getStatus()){ case 2: supEquipChargeStatus.setConnectorStatus(1); break; case 3: supEquipChargeStatus.setConnectorStatus(2); break; case 4: supEquipChargeStatus.setConnectorStatus(3); break; case 5: supEquipChargeStatus.setConnectorStatus(4); break; } TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); supEquipChargeStatus.setConnectorID(chargingGun.getFullNumber()); supEquipChargeStatus.setEquipmentClassification(1); supEquipChargeStatus.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); switch (chargingGun.getStatus()){ case 1: supEquipChargeStatus.setConnectorStatus(0); break; case 2: supEquipChargeStatus.setConnectorStatus(1); break; case 3: supEquipChargeStatus.setConnectorStatus(2); break; case 4: supEquipChargeStatus.setConnectorStatus(3); break; case 5: supEquipChargeStatus.setConnectorStatus(3); break; case 6: supEquipChargeStatus.setConnectorStatus(4); break; case 7: supEquipChargeStatus.setConnectorStatus(255); break; } supEquipChargeStatus.setCurrentA(chargingOrder.getCurrent()); supEquipChargeStatus.setSOC(StringUtils.hasLength(chargingOrder.getEndSoc())?new BigDecimal(chargingOrder.getEndSoc()):new BigDecimal("1")); supEquipChargeStatus.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); supEquipChargeStatus.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); supEquipChargeStatus.setTotalPower(chargingOrder.getElectrovalence()); tcecSuperviseUtil.notificationSupEquipChargeStatus(new Operator(), supEquipChargeStatus); } @Test public void test2(){ TChargingOrder chargingOrder = chargingOrderClient.orderDetail(1884874763556048898L).getData(); SupChargeOrderInfo supChargeOrderInfo = new SupChargeOrderInfo(); supChargeOrderInfo.setOperatorID(operatorId); supChargeOrderInfo.setEquipmentOwnerID(operatorId); supChargeOrderInfo.setStationID(String.valueOf(chargingOrder.getSiteId())); supChargeOrderInfo.setEquipmentID(String.valueOf(chargingOrder.getChargingPileId())); supChargeOrderInfo.setOrderNo(operatorId+chargingOrder.getCode()); TChargingGun chargingGun = chargingGunClient.getChargingGunById(chargingOrder.getChargingGunId()).getData(); supChargeOrderInfo.setConnectorID(chargingGun.getFullNumber()); supChargeOrderInfo.setEquipmentClassification(1); supChargeOrderInfo.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); supChargeOrderInfo.setStartTime(chargingOrder.getStartTime() != null ? chargingOrder.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); supChargeOrderInfo.setEndTime(chargingOrder.getEndTime() != null ? chargingOrder.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : ""); supChargeOrderInfo.setTotalPower(chargingOrder.getElectrovalence()); supChargeOrderInfo.setTotalElecMoney(chargingOrder.getElectrovalence()); supChargeOrderInfo.setTotalServiceMoney(chargingOrder.getServiceCharge()); supChargeOrderInfo.setTotalMoney(chargingOrder.getOrderAmount()); supChargeOrderInfo.setOrderStatus(chargingOrder.getStatus()); switch (chargingOrder.getEndMode()){ case 0: supChargeOrderInfo.setStopReason(5); supChargeOrderInfo.setStopDesc("异常终止"); break; case 1: supChargeOrderInfo.setStopReason(0); supChargeOrderInfo.setStopDesc("用户手动停止充电"); break; case 2: supChargeOrderInfo.setStopReason(1); supChargeOrderInfo.setStopDesc("客户归属地运营商平台停止充电"); break; case 3: supChargeOrderInfo.setStopReason(1); supChargeOrderInfo.setStopDesc("费用不足中止"); break; } tcecSuperviseUtil.notificationChargeOrderInfo(new Operator(), supChargeOrderInfo); } @Test public void test3(){ StationStatsInfoResult res = new StationStatsInfoResult(); List<Site> data = siteClient.getSiteAll().getData(); LocalDateTime now = LocalDateTime.now(); LocalDateTime startLocalDateTime = now.minusDays(1); LocalDateTime endLocalDateTime = now.minusDays(1); LocalDateTime localDateTime1 = LocalDateTime.of(2025, 2, 3, 0, 0, 0); LocalDateTime localDateTime2 = LocalDateTime.of(2025, 2, 3, 23, 59, 59); // 获取今天凌晨 startLocalDateTime.withHour(0); startLocalDateTime.withMinute(0); startLocalDateTime.withSecond(0); startLocalDateTime.withMonth(1); startLocalDateTime.withDayOfMonth(28); String start = DateUtils.localDateTimeToString(startLocalDateTime); endLocalDateTime.withHour(23); endLocalDateTime.withMinute(59); endLocalDateTime.withSecond(59); startLocalDateTime.withMonth(1); startLocalDateTime.withDayOfMonth(28); String end = DateUtils.localDateTimeToString(endLocalDateTime); ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO(); chargingStatisticeDTO.setStartTime(localDateTime1); chargingStatisticeDTO.setEndTime(localDateTime2); List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData(); List<StationStatsInfo> stationStatsInfos = new ArrayList<>(); String start1 = DateUtils.localDateTimeToString(localDateTime1); String start2 = DateUtils.localDateTimeToString(localDateTime2); for (Site datum : data) { StationStatsInfo stationStatsInfo = new StationStatsInfo(); stationStatsInfo.setStationID(datum.getId().toString()); stationStatsInfo.setEquipmentOwnerID("906171535"); stationStatsInfo.setOperatorID("906171535"); stationStatsInfo.setStationClassification(1); stationStatsInfo.setStartTime(start1); stationStatsInfo.setEndTime(start2); List<TChargingOrder> chargingOrders = data1.stream().filter(e -> e.getSiteId().equals(datum.getId())).collect(Collectors.toList()); // 充电电量 BigDecimal electricity = new BigDecimal("0"); int chargingCount = 0; for (TChargingOrder chargingOrder : chargingOrders) { if (chargingOrder.getElectricity()!=null){ electricity = electricity.add(chargingOrder.getElectricity()); chargingCount++; } } stationStatsInfo.setStationElectricity(electricity.divide(new BigDecimal("24"),4, BigDecimal.ROUND_DOWN)); stationStatsInfo.setStationTotalChargeEnergy(electricity.setScale(4, BigDecimal.ROUND_DOWN)); stationStatsInfo.setStationTotalWarningNum(0); stationStatsInfo.setStationTotalOtherEnergy(new BigDecimal("0")); stationStatsInfo.setStationTotalChargeNum(chargingCount); //构建设备统计数据 List<EquipmentStatsInfo> EquipmentStatsInfos = new ArrayList<>(); Map<Integer, List<TChargingOrder>> collect = chargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingPileId)); for (Integer integer : collect.keySet()) { List<TChargingOrder> tChargingOrders = collect.get(integer); BigDecimal reduce1 = tChargingOrders.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add); EquipmentStatsInfo equipmentStatsInfo = new EquipmentStatsInfo(); equipmentStatsInfo.setEquipmentClassification(1); long chargingTime = 0L; for (TChargingOrder tChargingOrder : tChargingOrders) { // 累加充电时长 LocalDateTime startTime = tChargingOrder.getStartTime(); LocalDateTime endTime = tChargingOrder.getEndTime(); // 计算时间差 单位分钟 chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60; } equipmentStatsInfo.setEquipmentTotalChargeTime(chargingTime); equipmentStatsInfo.setEquipmentTotalChargeNum(tChargingOrders.size()); equipmentStatsInfo.setEquipmentTotalWarningNum(0); equipmentStatsInfo.setEquipmentID(integer.toString()); equipmentStatsInfo.setEquipmentElectricity(reduce1); //构建设备接口统计数据 Map<Integer, List<TChargingOrder>> collect2 = tChargingOrders.stream().collect(Collectors.groupingBy(TChargingOrder::getChargingGunId)); List<ConnectorStatsInfo> ConnectorStatsInfos = new ArrayList<>(); for (Integer integer1 : collect2.keySet()) { List<TChargingOrder> tChargingOrders1 = collect2.get(integer1); BigDecimal reduce2 = tChargingOrders1.stream().map(TChargingOrder::getChargingCapacity).reduce(BigDecimal.ZERO, BigDecimal::add); long chargingTime1 = 0L; for (TChargingOrder chargingOrder : tChargingOrders1) { // 累加充电时长 LocalDateTime startTime = chargingOrder.getStartTime(); LocalDateTime endTime = chargingOrder.getEndTime(); // 计算时间差 单位分钟 chargingTime += ChronoUnit.SECONDS.between(startTime, endTime)/60; } TChargingGun chargingGun = chargingGunClient.getChargingGunById(integer1).getData(); ConnectorStatsInfo connectorStatsInfo = new ConnectorStatsInfo(); connectorStatsInfo.setConnectorID(chargingGun.getFullNumber()); connectorStatsInfo.setConnectorElectricity(reduce2); connectorStatsInfo.setConnectorTotalChargeTime(Integer.valueOf(chargingTime+"")); connectorStatsInfo.setConnectorTotalChargeNum(tChargingOrders1.size()); connectorStatsInfo.setConnectorTotalWarningNum(0); ConnectorStatsInfos.add(connectorStatsInfo); } equipmentStatsInfo.setConnectorStatsInfos(ConnectorStatsInfos); EquipmentStatsInfos.add(equipmentStatsInfo); } stationStatsInfo.setEquipmentStatsInfos(EquipmentStatsInfos); stationStatsInfos.add(stationStatsInfo); } res.setStationStatsInfos(stationStatsInfos); tcecSuperviseUtil.superviseNotificationOperationStatsInfo(res); } @Resource private ChargingPileClient chargingPileClient; @Test public void test4(){ List<Site> data = siteClient.getSiteAll().getData(); List<Integer> siteIds = data.stream().map(Site::getId).collect(Collectors.toList()); List<TChargingPile> tChargingPiles = chargingPileClient.getChargingPileBySiteIds(siteIds).getData(); List<Integer> collect1 = tChargingPiles.stream().map(TChargingPile::getId).collect(Collectors.toList()); List<TChargingGun> chargingGunList = chargingGunClient.getChargingGunByChargingPileIds(collect1).getData(); SupStationPowerInfoResult supStationPowerInfoResult = new SupStationPowerInfoResult(); List<SupStationPowerInfo> stationStatsInfos = new ArrayList<>(); LocalDateTime now = LocalDateTime.of(2025, 2, 3, 23, 30, 0); LocalDateTime startLocalDateTime = LocalDateTime.of(2025, 2, 3, 20, 0, 0); ChargingStatisticeDTO chargingStatisticeDTO = new ChargingStatisticeDTO(); chargingStatisticeDTO.setStartTime(startLocalDateTime); chargingStatisticeDTO.setEndTime(now); List<TChargingOrder> data1 = chargingOrderClient.getChargingStatistics(chargingStatisticeDTO).getData(); for (Site datum : data) { List<TChargingOrder> collect = data1.stream().filter(e -> e.getSiteId().equals(datum.getId()) &&e.getChargingPower()!=null).collect(Collectors.toList()); SupStationPowerInfo supStationPowerInfo = new SupStationPowerInfo(); supStationPowerInfo.setOperatorID("906171535"); supStationPowerInfo.setEquipmentOwnerID("906171535"); supStationPowerInfo.setStationID(datum.getId().toString()); supStationPowerInfo.setStationClassification(1); supStationPowerInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); if (collect.isEmpty()){ supStationPowerInfo.setStationRealTimePower(new BigDecimal("0")); }else{ BigDecimal divide = collect.stream().map(TChargingOrder::getChargingPower).reduce(BigDecimal.ZERO,BigDecimal::add).divide(new BigDecimal(collect.size()),4,BigDecimal.ROUND_DOWN); supStationPowerInfo.setStationRealTimePower(divide); } supStationPowerInfo.setEquipmentPowerInfos(buildEquipmentPowerInfo(datum.getId(), tChargingPiles, chargingGunList)); stationStatsInfos.add(supStationPowerInfo); } supStationPowerInfoResult.setSupStationPowerInfos(stationStatsInfos); tcecSuperviseUtil.superviseNotificationRealtimePowerInfo(supStationPowerInfoResult); } /** * 构建桩数据 * @param tChargingPiles * @return */ public List<SupEquipmentPowerInfo> buildEquipmentPowerInfo(Integer siteId, List<TChargingPile> tChargingPiles, List<TChargingGun> chargingGunList){ List<SupEquipmentPowerInfo> equipmentInfos = new ArrayList<>(); List<TChargingPile> collect = tChargingPiles.stream().filter(s -> s.getSiteId().equals(siteId)).collect(Collectors.toList()); for (TChargingPile tChargingPile : collect) { SupEquipmentPowerInfo equipmentInfo = new SupEquipmentPowerInfo(); equipmentInfo.setEquipmentID(tChargingPile.getId().toString()); equipmentInfo.setEquipmentClassification(1); equipmentInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); equipmentInfo.setEquipRealTimePower(tChargingPile.getRatedPower()); //构建设备接口信息 equipmentInfo.setConnectorPowerInfos(buildConnectorPowerInfos(tChargingPile.getId(), tChargingPile.getCode(), chargingGunList)); equipmentInfos.add(equipmentInfo); } return equipmentInfos; } public List<SupConnectorPowerInfo> buildConnectorPowerInfos(Integer chargingPileId, String code, List<TChargingGun> chargingGunList){ List<SupConnectorPowerInfo> connectorInfos = new ArrayList<>(); List<TChargingGun> collect = chargingGunList.stream().filter(s -> s.getChargingPileId().equals(chargingPileId)).collect(Collectors.toList()); for (TChargingGun chargingGun : collect) { SupConnectorPowerInfo connectorInfo = new SupConnectorPowerInfo(); connectorInfo.setConnectorID(chargingGun.getFullNumber()); connectorInfo.setEquipmentClassification(chargingGun.getEquipmentClassification()); connectorInfo.setDataTime(DateUtils.localDateTimeToString(LocalDateTime.now())); connectorInfo.setConnectorRealTimePower(chargingGun.getChargingPower()); connectorInfos.add(connectorInfo); } return connectorInfos; } private final static String query_token = "/query_token"; private static final String OperatorID = "MA01H3BQ2"; private static final String OperatorSecret = "f1331ef0b37c2d1b"; private static final String SigSecret = "a6fedf0e1b27d6f7"; private static final String DataSecret = "50a61b93919c9604"; private static final String DataSecretIV = "7c8ac6861661d584"; private final static String url = "https://dev-gov-hlht-sc.unievbj.com/evcs/v1.0.0"; @Test public void test5(){ HttpRequest post = HttpUtil.createPost(url + query_token); JSONObject info = new JSONObject(); info.put("OperatorID", "906171535"); info.put("OperatorSecret", OperatorSecret); Long timeStamp = Long.valueOf(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); post.contentType("application/json;charset=utf-8"); BaseRequestJianGuan baseRequest = new BaseRequestJianGuan(); baseRequest.setOperatorID("906171535"); baseRequest.setTimeStamp(timeStamp+""); baseRequest.setSeq("0001"); String jsonString = JacksonUtils.toJson(info); SequenceGenerator generator = new SequenceGenerator(); String nextSequence = generator.getNextSequence(); String data = AesEncryption.encrypt(DataSecret, DataSecretIV,jsonString); String hmacMD5 = HMacMD5Util.getHMacMD5("906171535",timeStamp+"", data,nextSequence,SigSecret); baseRequest.setData(data); baseRequest.setSig(hmacMD5); String request_json = JacksonUtils.toJson(baseRequest); log.info("获取三方平台授权token请求地址:" + post.getUrl()); log.info("获取三方平台授权token请求参数:" + request_json); log.info("获取三方平台授权token请求Data:" + jsonString); post.body(request_json); HttpResponse execute = post.execute(); if(200 != execute.getStatus()){ log.error("获取三方平台授权token失败:" + execute.body()); } log.info("获取三方平台授权token响应参数:" + execute.body()); BaseResult baseResult = com.alibaba.fastjson.JSON.parseObject(execute.body(), BaseResult.class); Integer Ret = baseResult.getRet(); if(0 != Ret){ log.error("获取三方平台授权token失败:" + baseResult.getMsg()); } //解密参数 String decrypt = AESUtil.decrypt(baseResult.getData(), DataSecret, DataSecretIV); log.info("获取三方平台授权token响应Data:" + decrypt); QueryTokenResult queryTokenResult = JSON.parseObject(decrypt, QueryTokenResult.class); String token = queryTokenResult.getAccessToken(); // Long tokenAvailableTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) + queryTokenResult.getTokenAvailab System.err.println(token); } } ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/controller/TChargingOrderController.java
@@ -2332,6 +2332,15 @@ return chargingOrderService.getNotPaymentChargingOrder(); } /** * 手动推送订单给三方平台 * @param code * @return */ @PostMapping("/pushOrderInfo") public R pushOrderInfo(@RequestParam String code){ return chargingOrderService.pushOrderInfo(code); } } ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/TChargingOrderService.java
@@ -310,4 +310,13 @@ * @return */ R getNotPaymentChargingOrder(); /** * * 手动推送订单给第三方平台 * @param code * @return */ R pushOrderInfo(String code); } ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
@@ -697,15 +697,15 @@ log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString()); sendMessageClient.platformStartCharging(platformStartCharging); //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 Long id = chargingOrder.getId(); //执行5分钟的定时任务检测 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(()->{ if(timingDetection(id)){ scheduler.shutdown(); } }, 5, 1, TimeUnit.SECONDS); // //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 // Long id = chargingOrder.getId(); // //执行5分钟的定时任务检测 // ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // scheduler.scheduleAtFixedRate(()->{ // if(timingDetection(id)){ // scheduler.shutdown(); // } // }, 5, 1, TimeUnit.SECONDS); return AjaxResult.success(); } @@ -3890,15 +3890,15 @@ platformStartCharging.setAccount_balance(rechargeAmount); log.info(chargingOrder.getCode() + ":-------------------远程调起开始充电请求-------------------" + platformStartCharging.toString()); sendMessageClient.platformStartCharging(platformStartCharging); //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 Long id = chargingOrder.getId(); //执行5分钟的定时任务检测 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(()->{ if(timingDetection(id)){ scheduler.shutdown(); } }, 5, 1, TimeUnit.SECONDS); // //异步线程检测远程启动的应答结果。如果失败,则需要全额退款 // Long id = chargingOrder.getId(); // //执行5分钟的定时任务检测 // ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // scheduler.scheduleAtFixedRate(()->{ // if(timingDetection(id)){ // scheduler.shutdown(); // } // }, 5, 1, TimeUnit.SECONDS); //推送三方平台订单状态 tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); @@ -3967,4 +3967,21 @@ } return R.ok(mapList); } /** * * 手动推送订单给第三方平台 * @param code * @return */ @Override public R pushOrderInfo(String code) { TChargingOrder chargingOrder = this.getOne(new LambdaQueryWrapper<TChargingOrder>().eq(TChargingOrder::getCode, code)); tcecClient.notificationEquipChargeStatus(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); tcecClient.notificationStopChargeResult(chargingOrder.getStartChargeSeq(), chargingOrder.getChargingGunId().toString(), chargingOrder.getOperatorId()); tcecClient.notificationChargeOrderInfo(chargingOrder.getStartChargeSeq(), chargingOrder.getOperatorId()); return R.ok(); } }