From 64d91e3c3414b928254d36c6a2c9cb0dd7262dcf Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期四, 17 四月 2025 18:56:24 +0800 Subject: [PATCH] 删除消息队列代码改为同步接口处理 --- ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java | 4 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java | 535 ++++++++++++++++++++++++++++++++++++++ ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java | 264 ++++++++++++------ 3 files changed, 705 insertions(+), 98 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java index c122e98..53b3c40 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java @@ -15,6 +15,7 @@ import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.iotda.utils.tools.StrategyUtil; import com.ruoyi.integration.rocket.model.*; +import com.ruoyi.integration.rocket.produce.ChargingMessageUtil; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,13 @@ private IotMessageProduce iotMessageProduce; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; + + @Resource + private ChargingMessageUtil chargingMessageUtil; + + + + /** * 设备消息监听 * @param jsonObject @@ -77,245 +85,309 @@ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); chargingMessage.setOnlineMessage(onlineMessage); - // 响应硬件 - // 业务处理 登录认证应答 - OnlineReply onlineReply = new OnlineReply(); - onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code()); - onlineReply.setOnline_result(0); - result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply)); - log.info("充电桩登录认证-返回结果:{}",result); - // 响应硬件 对时设置应答 - TimingSettingReply timingSettingReplyOnline = new TimingSettingReply(); - timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code()); - timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); - iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + // 响应硬件 + // 业务处理 登录认证应答 + OnlineReply onlineReply = new OnlineReply(); + onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code()); + onlineReply.setOnline_result(0); + String result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply)); + log.info("充电桩登录认证-返回结果:{}",result); + // 响应硬件 对时设置应答 + TimingSettingReply timingSettingReplyOnline = new TimingSettingReply(); + timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code()); + timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); + iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.PING: PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); chargingMessage.setPingMessage(pingMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 - Pong pong = new Pong(); - pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); - pong.setCharging_gun_code(pingMessage.getCharging_gun_code()); - pong.setCharging_gun_status(0); - iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + Pong pong = new Pong(); + pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); + pong.setCharging_gun_code(pingMessage.getCharging_gun_code()); + pong.setCharging_gun_status(0); + iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); chargingMessage.setEndChargeMessage(endChargeMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); chargingMessage.setErrorMessageMessage(errorMessageMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 - BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); - if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ - // 首次 - billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); - billingModeVerifyReply.setBilling_model_code("0"); - billingModeVerifyReply.setBilling_model_result(1); - }else { - // 查询桩使用的模版 - CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO(); - dto.setCode(billingModeVerifyMessage.getCharging_pile_code()); - dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code())); - Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData(); - // 校验计费模版是否准确 - billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); - billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code()); - if(check){ - billingModeVerifyReply.setBilling_model_result(0); - }else { - billingModeVerifyReply.setBilling_model_result(1); + new Thread(new Runnable() { + @Override + public void run() { + BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); + if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ + // 首次 + billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); + billingModeVerifyReply.setBilling_model_code("0"); + billingModeVerifyReply.setBilling_model_result(1); + }else { + // 查询桩使用的模版 + CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO(); + dto.setCode(billingModeVerifyMessage.getCharging_pile_code()); + dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code())); + Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData(); + // 校验计费模版是否准确 + billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); + billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code()); + if(check){ + billingModeVerifyReply.setBilling_model_result(0); + }else { + billingModeVerifyReply.setBilling_model_result(1); + } + } + iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply)); } - } - iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 - List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); - Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); - // 价格设置 - AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); - StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); - // 时段设置 - StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); - TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); - acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString()); - acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code()); - acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO); - iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); + Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); + // 价格设置 + AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); + StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); + // 时段设置 + StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); + TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); + acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString()); + acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code()); + acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO); + iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: log.info("充电实时数据上传"); UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); chargingMessage.setParameterSettingMessage(parameterSettingMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); chargingMessage.setBmsAbortMessage(bmsAbortMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); chargingMessage.setMotorAbortMessage(motorAbortMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); chargingMessage.setBmsInformationMessage(bmsInformationMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 - PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); - platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); - platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code()); - platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount()); - platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO); - platformConfirmationCharging.setAuthentication(1); - // TODO 若是失败,给出失败原因 - iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); + platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); + platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code()); + platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount()); + platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO); + platformConfirmationCharging.setAuthentication(1); + // TODO 若是失败,给出失败原因 + iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); transactionRecordMessage.setResult(content.toJSONString()); chargingMessage.setTransactionRecordMessage(transactionRecordMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 - ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); - confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); - confirmTransactionRecord.setConfirm_result(0); - iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); + confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); + confirmTransactionRecord.setConfirm_result(0); + iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); chargingMessage.setTimingSettingMessage(timingSettingMessage); + chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 对时设置应答 - TimingSettingReply timingSettingReply = new TimingSettingReply(); - timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); - timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); - iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply)); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + new Thread(new Runnable() { + @Override + public void run() { + TimingSettingReply timingSettingReply = new TimingSettingReply(); + timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); + timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); + iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply)); + } + }).start(); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); - sendResult = enhanceProduce.chargingMessage(chargingMessage); + chargingMessageUtil.handleMessage(chargingMessage); +// sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; } - log.info("rocketmq消息下发结果:{}",sendResult); +// log.info("rocketmq消息下发结果:{}",sendResult); return AjaxResult.success(); } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java new file mode 100644 index 0000000..60bd13f --- /dev/null +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java @@ -0,0 +1,535 @@ +package com.ruoyi.integration.rocket.produce; + +import com.alibaba.fastjson.JSONObject; +import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; +import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; +import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; +import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; +import com.ruoyi.integration.api.model.*; +import com.ruoyi.integration.drainage.TCECPushUtil; +import com.ruoyi.integration.iotda.constant.SendTagConstant; +import com.ruoyi.integration.mongodb.service.*; +import com.ruoyi.integration.rocket.model.*; +import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; +import com.ruoyi.order.api.feignClient.ChargingOrderClient; +import com.ruoyi.order.api.model.TChargingOrder; +import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; +import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; +import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; +import com.ruoyi.order.api.vo.SecurityDetectionVO; +import com.ruoyi.order.api.vo.TransactionRecordMessageVO; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.math.RoundingMode; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +@Slf4j +@Component +public class ChargingMessageUtil { + + @Autowired + private AcquisitionBillingModeService acquisitionBillingModeService; + @Autowired + private BillingModeVerifyService billingModeVerifyService; + @Autowired + private BmsAbortService bmsAbortService; + @Resource + private ChargingOrderClient chargingOrderClient; + @Autowired + private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; + @Autowired + private OnlineService onlineService; + @Autowired + private PingService pingService; + @Autowired + private EndChargeService endChargeService; + @Autowired + private ErrorMessageMessageService errorMessageMessageService; + @Autowired + private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; + @Resource + private AccountingStrategyDetailClient accountingStrategyDetailClient; + @Autowired + private ChargingHandshakeService chargingHandshakeService; + @Autowired + private ParameterSettingService parameterSettingService; + @Autowired + private MotorAbortService motorAbortService; + @Autowired + private BmsInformationService bmsInformationService; + @Autowired + private ChargingPileStartsChargingService chargingPileStartsChargingService; + @Autowired + private PlatformStartChargingReplyService platformStartChargingReplyService; + @Autowired + private PlatformStopChargingReplyService platformStopChargingReplyService; + @Autowired + private TransactionRecordService transactionRecordService; + @Autowired + private UpdateBalanceReplyService updateBalanceReplyService; + @Autowired + private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; + @Autowired + private ClearOfflineCardReplyService clearOfflineCardReplyService; + @Autowired + private WorkingParameterSettingReplyService workingParameterSettingReplyService; + @Autowired + private TimingSettingService timingSettingService; + @Autowired + private SetupBillingModelReplyService setupBillingModelReplyService; + @Autowired + private GroundLockRealTimeDataService groundLockRealTimeDataService; + @Autowired + private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; + @Autowired + private PlatformRestartReplyService platformRestartReplyService; + @Autowired + private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; + @Autowired + private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; + @Autowired + private SecurityDetectionService securityDetectionService; + @Autowired + private TCECPushUtil tcecPushUtil; + + @Resource + private ChargingPileClient chargingPileClient; + @Resource + private ChargingGunClient chargingGunClient; + + @Resource + private RedisTemplate redisTemplate; + + + + + + + public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ + log.info("rocket收到的消息内容:{}",message); + String serviceId = message.getServiceId(); + if(!StringUtils.hasLength(serviceId)){ + return; + } + log.info("rocket收到的消息内容:{} {}", serviceId,message); + switch (serviceId){ + case SendTagConstant.ONLINE: + OnlineMessage onlineMessage = message.getOnlineMessage(); + log.info("充电桩登录认证业务消息处理:{}",onlineMessage); + // 持久化消息 + Online online = new Online(); + BeanUtils.copyProperties(onlineMessage,online); + onlineService.create(online); + break; + case SendTagConstant.PING: + PingMessage pingMessage = message.getPingMessage(); + log.info("充电桩心跳包-业务消息处理:{}",pingMessage); + // 持久化消息 + Ping ping = new Ping(); + BeanUtils.copyProperties(pingMessage,ping); + pingService.save(ping); + //存储缓存中,5分钟有效 + redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor.execute(new Runnable() { + @Override + public void run() { + UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); + vo1.setGun_code(pingMessage.getCharging_gun_code()); + vo1.setPile_code(pingMessage.getCharging_pile_code()); + vo1.setStatus(pingMessage.getCharging_gun_status()); + chargingPileClient.updateChargingPileStatus(vo1); + + try { + tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("设备状态推送监管平台失败:"+e.getMessage()); + } + } + }); + break; + case SendTagConstant.END_CHARGE: + EndChargeMessage endChargeMessage = message.getEndChargeMessage(); + log.info("充电结束-业务消息处理:{}",endChargeMessage); + // 持久化消息 + EndCharge endCharge = new EndCharge(); + BeanUtils.copyProperties(endChargeMessage,endCharge); + endChargeService.create(endCharge); + ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor1.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); + try { + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); + tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("充电结束推送监管平台失败:"+e.getMessage()); + } + } + }); + break; + case SendTagConstant.ERROR_MESSAGE: + ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); + log.info("错误报文-业务消息处理:{}",errorMessageMessage1); + // 持久化消息 + ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); + BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); + errorMessageMessageService.create(errorMessageMessage); + break; + case SendTagConstant.BILLING_MODE_VERIFY: + BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); + log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); + // 持久化消息 + BillingModeVerify billingModeVerify = new BillingModeVerify(); + BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); + billingModeVerifyService.create(billingModeVerify); + break; + case SendTagConstant.ACQUISITION_BILLING_MODE: + AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); + log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); + // 持久化消息 + AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); + BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); + acquisitionBillingModeService.create(acquisitionBillingMode); + break; + case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: + try { + UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); + log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); + // 持久化消息 + UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); + BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); + // 查询mogondb上一条数据 + UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); + // 查询订单 + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); + // 查询当前时间段的计费策略 + TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); + uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); + uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); + if (Objects.nonNull(data)) { + uploadRealTimeMonitoringDataService.updateById(data.getId()); + uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); + uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); + uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + }else { + log.info("首次上传实时监测数据"); + uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); + uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); + uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); + } + uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); + uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); + uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); +// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); +// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); + int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); + if(i == 0){ + log.error("数据存储mongo失败"); + } + + ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor2.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); + BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); + chargingOrderClient.chargeMonitoring(query); + chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); + + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + break; + case SendTagConstant.CHARGING_HANDSHAKE: + ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); + log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); + // 持久化消息 + ChargingHandshake chargingHandshake = new ChargingHandshake(); + BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); + chargingHandshakeService.create(chargingHandshake); + break; + case SendTagConstant.PARAMETER_SETTING: + ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); + log.info("业务消息处理:{}",parameterSettingMessage); + // 持久化消息 + ParameterSetting parameterSetting = new ParameterSetting(); + BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); + parameterSettingService.create(parameterSetting); + break; + case SendTagConstant.BMS_ABORT: + BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); + log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); + // 持久化消息 + BmsAbort bmsAbort = new BmsAbort(); + BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); + bmsAbortService.create(bmsAbort); + + ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor3.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); + } + }); + break; + case SendTagConstant.MOTOR_ABORT: + MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); + log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); + // 持久化消息 + MotorAbort motorAbort = new MotorAbort(); + BeanUtils.copyProperties(motorAbortMessage,motorAbort); + motorAbortService.create(motorAbort); + ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor4.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); + } + }); + break; + case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: + BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); + log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); + // 持久化消息 + BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); + BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); + bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); + ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor5.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderBms)){ + chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); + chargingOrderClient.updateChargingOrder(chargingOrderBms); + } + } + }); + break; + case SendTagConstant.BMS_INFORMATION: + BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); + log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); + // 持久化消息 + BmsInformation bmsInformation = new BmsInformation(); + BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); + bmsInformationService.create(bmsInformation); + break; + case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: + ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); + log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); + // 持久化消息 + ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); + BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); + chargingPileStartsChargingService.create(chargingPileStartsCharging); + break; + case SendTagConstant.PLATFORM_START_CHARGING_REPLY: + PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); + log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); + // 持久化消息 + PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); + BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); + platformStartChargingReplyService.create(platformStartChargingReply); + ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor6.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); + BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); + chargingOrderClient.startChargeSuccessfully(message1); + } + }); + break; + case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: + PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); + log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); + // 持久化消息 + PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); + BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); + platformStopChargingReplyService.create(platformStopChargingReply); + ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor7.execute(new Runnable() { + @Override + public void run() { + PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); + BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); + chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); + } + }); + break; + case SendTagConstant.TRANSACTION_RECORD: + TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); + log.info("交易记录-业务消息处理:{}",transactionRecordMessage); + transactionRecordMessage.setResult(JSONObject.toJSONString(message)); + // 持久化消息 + TransactionRecord transactionRecord = new TransactionRecord(); + BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); + transactionRecordService.create(transactionRecord); + ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor8.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderRecord)){ + chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); + chargingOrderClient.updateChargingOrder(chargingOrderRecord); + } + //计算费用 + TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); + BeanUtils.copyProperties(transactionRecordMessage,vo); + int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); + if(200 != code){ + //失败后添加到队列中继续处理数据 + redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); + } + } + }); + + + // 添加实时上传记录结束记录 + // 查询mogondb上一条数据 + UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); + if(Objects.nonNull(data) && data.getStatus() != 5){ + UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); + BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); + uploadRealTimeMonitoringData.setStatus(5); + uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); + } + break; + case SendTagConstant.UPDATE_BALANCE_REPLY: + UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); + log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); + // 持久化消息 + UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); + BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); + updateBalanceReplyService.create(updateBalanceReply); + break; + case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: + SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); + log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); + // 持久化消息 + SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); + BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); + synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); + break; + case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: + ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); + log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); + // 持久化消息 + ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); + BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); + clearOfflineCardReplyService.create(clearOfflineCardReply); + break; + case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: + WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); + log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); + // 持久化消息 + WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); + BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); + workingParameterSettingReplyService.create(workingParameterSettingReply); + break; + case SendTagConstant.TIMING_SETTING: + TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); + log.info("对时设置-业务消息处理:{}",timingSettingMessage); + // 持久化消息 + TimingSetting timingSetting = new TimingSetting(); + BeanUtils.copyProperties(timingSettingMessage,timingSetting); + timingSettingService.create(timingSetting); + break; + case SendTagConstant.SETUP_BILLING_MODEL_REPLY: + SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); + log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); + // 持久化消息 + SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); + BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); + setupBillingModelReplyService.create(setupBillingModelReply); + break; + case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: + GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); + log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); + // 持久化消息 + GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); + BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); + groundLockRealTimeDataService.create(groundLockRealTimeData); + break; + case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: + ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); + log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); + // 持久化消息 + ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); + BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); + chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); + break; + case SendTagConstant.PLATFORM_RESTART_REPLY: + PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); + log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); + // 持久化消息 + PlatformRestartReply platformRestartReply = new PlatformRestartReply(); + BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); + platformRestartReplyService.create(platformRestartReply); + break; + case SendTagConstant.QR_CODE_DELIVERY_REPLY: + QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); + log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); + QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); + BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); + qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); + break; + case SendTagConstant.SECURITY_DETECTION: + SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); + log.info("安全监测-业务消息处理:{}",securityDetectionMessage); + SecurityDetection securityDetection = new SecurityDetection(); + BeanUtils.copyProperties(securityDetectionMessage,securityDetection); + securityDetectionService.create(securityDetection); + ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor9.execute(new Runnable() { + @Override + public void run() { + SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); + BeanUtils.copyProperties(securityDetection, securityDetection1); + chargingOrderClient.securityDetection(securityDetection1); + } + }); + break; + default: + PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); + log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); + PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); + BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); + platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); + break; + } + } + + +} \ No newline at end of file diff --git a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java index 282599d..09cb9c0 100644 --- a/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java +++ b/ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java @@ -731,7 +731,7 @@ Integer counter = counter_map.get(code); PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key); //5分钟内还未插枪则取消充电,退回金额。 - if(failure_cause == 5 && (null == counter || counter < 300)){ + if(failure_cause == 5 && (null == counter || counter < 1800)){ counter = (null == counter ? 0 : counter) + 1; counter_map.put(code, counter); //启动失败 @@ -772,7 +772,7 @@ log.info(code + ":-------------------未上传开启充电结果-------------------" + counter); PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key); //5分钟内未启动成功,退回金额。 - if(null == counter || counter < 300){ + if(null == counter || counter < 1800){ counter = (null == counter ? 0 : counter) + 1; boot_failed_map.put(code, counter); //启动失败 -- Gitblit v1.7.1