ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -15,6 +15,7 @@ import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.iotda.utils.tools.StrategyUtil; import com.ruoyi.integration.rocket.model.*; import com.ruoyi.integration.rocket.produce.ChargingMessageUtil; import com.ruoyi.integration.rocket.produce.EnhanceProduce; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,13 @@ private IotMessageProduce iotMessageProduce; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Resource private ChargingMessageUtil chargingMessageUtil; /** * 设备消息监听 * @param jsonObject @@ -77,245 +85,309 @@ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); chargingMessage.setOnlineMessage(onlineMessage); // 响应硬件 // 业务处理 登录认证应答 OnlineReply onlineReply = new OnlineReply(); onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code()); onlineReply.setOnline_result(0); result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply)); log.info("充电桩登录认证-返回结果:{}",result); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReplyOnline = new TimingSettingReply(); timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code()); timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline)); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { // 响应硬件 // 业务处理 登录认证应答 OnlineReply onlineReply = new OnlineReply(); onlineReply.setCharging_pile_code(onlineMessage.getCharging_pile_code()); onlineReply.setOnline_result(0); String result = iotMessageProduce.sendMessage(onlineReply.getCharging_pile_code(), ServiceIdMenu.ONLINE_REPLY.getKey(), messageUtil.onlineReply(onlineReply)); log.info("充电桩登录认证-返回结果:{}",result); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReplyOnline = new TimingSettingReply(); timingSettingReplyOnline.setCharging_pile_code(onlineMessage.getCharging_pile_code()); timingSettingReplyOnline.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); iotMessageProduce.sendMessage(timingSettingReplyOnline.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReplyOnline)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.PING: PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); chargingMessage.setPingMessage(pingMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); pong.setCharging_gun_code(pingMessage.getCharging_gun_code()); pong.setCharging_gun_status(0); iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong)); sendResult = enhanceProduce.chargingMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); pong.setCharging_gun_code(pingMessage.getCharging_gun_code()); pong.setCharging_gun_status(0); iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); chargingMessage.setEndChargeMessage(endChargeMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); chargingMessage.setErrorMessageMessage(errorMessageMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ // 首次 billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); billingModeVerifyReply.setBilling_model_code("0"); billingModeVerifyReply.setBilling_model_result(1); }else { // 查询桩使用的模版 CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO(); dto.setCode(billingModeVerifyMessage.getCharging_pile_code()); dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code())); Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData(); // 校验计费模版是否准确 billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code()); if(check){ billingModeVerifyReply.setBilling_model_result(0); }else { billingModeVerifyReply.setBilling_model_result(1); new Thread(new Runnable() { @Override public void run() { BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); if(billingModeVerifyMessage.getBilling_model_code().equals("0")){ // 首次 billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); billingModeVerifyReply.setBilling_model_code("0"); billingModeVerifyReply.setBilling_model_result(1); }else { // 查询桩使用的模版 CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO(); dto.setCode(billingModeVerifyMessage.getCharging_pile_code()); dto.setStrategyDetailId(Integer.valueOf(billingModeVerifyMessage.getBilling_model_code())); Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData(); // 校验计费模版是否准确 billingModeVerifyReply.setCharging_pile_code(billingModeVerifyMessage.getCharging_pile_code()); billingModeVerifyReply.setBilling_model_code(billingModeVerifyMessage.getBilling_model_code()); if(check){ billingModeVerifyReply.setBilling_model_result(0); }else { billingModeVerifyReply.setBilling_model_result(1); } } iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply)); } } iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply)); sendResult = enhanceProduce.chargingMessage(chargingMessage); }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); // 价格设置 AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); // 时段设置 StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString()); acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code()); acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO); iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); sendResult = enhanceProduce.chargingMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { List<TAccountingStrategyDetail> accountingStrategyDetails = accountingStrategyDetailClient.getDetailListByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); Map<Integer, TAccountingStrategyDetail> strategyPrice = StrategyUtil.getStrategyPrice(accountingStrategyDetails); // 价格设置 AcquisitionBillingModeReply acquisitionBillingModeReply = new AcquisitionBillingModeReply(); StrategyUtil.setStrategyPrice(strategyPrice,acquisitionBillingModeReply); // 时段设置 StrategyUtil.setTime(accountingStrategyDetails,acquisitionBillingModeReply); TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailByCode(acquisitionBillingModeMessage.getCharging_pile_code()).getData(); acquisitionBillingModeReply.setBilling_model_code(accountingStrategyDetail.getId().toString()); acquisitionBillingModeReply.setCharging_pile_code(acquisitionBillingModeMessage.getCharging_pile_code()); acquisitionBillingModeReply.setLoss_ratio(BigDecimal.ZERO); iotMessageProduce.sendMessage(acquisitionBillingModeReply.getCharging_pile_code(), ServiceIdMenu.ACQUISITION_BILLING_MODE_REPLY.getKey(),messageUtil.acquisitionBillingModeReply(acquisitionBillingModeReply)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: log.info("充电实时数据上传"); UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); chargingMessage.setParameterSettingMessage(parameterSettingMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); chargingMessage.setBmsAbortMessage(bmsAbortMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); chargingMessage.setMotorAbortMessage(motorAbortMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); chargingMessage.setBmsInformationMessage(bmsInformationMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code()); platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount()); platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO); platformConfirmationCharging.setAuthentication(1); // TODO 若是失败,给出失败原因 iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging)); sendResult = enhanceProduce.chargingMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { PlatformConfirmationCharging platformConfirmationCharging = new PlatformConfirmationCharging(); platformConfirmationCharging.setCharging_pile_code(chargingPileStartsChargingMessage.getCharging_pile_code()); platformConfirmationCharging.setCharging_gun_code(chargingPileStartsChargingMessage.getCharging_gun_code()); platformConfirmationCharging.setCard_number(chargingPileStartsChargingMessage.getAccount()); platformConfirmationCharging.setAccount_balance(BigDecimal.ZERO); platformConfirmationCharging.setAuthentication(1); // TODO 若是失败,给出失败原因 iotMessageProduce.sendMessage(platformConfirmationCharging.getCharging_pile_code(), ServiceIdMenu.PLATFORM_CONFIRMATION_CHARGING.getKey(),messageUtil.platformConfirmationCharging(platformConfirmationCharging)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); transactionRecordMessage.setResult(content.toJSONString()); chargingMessage.setTransactionRecordMessage(transactionRecordMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); confirmTransactionRecord.setConfirm_result(0); iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord)); sendResult = enhanceProduce.chargingMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { ConfirmTransactionRecord confirmTransactionRecord = new ConfirmTransactionRecord(); confirmTransactionRecord.setTransaction_serial_number(transactionRecordMessage.getTransaction_serial_number()); confirmTransactionRecord.setConfirm_result(0); iotMessageProduce.sendMessage(transactionRecordMessage.getCharging_pile_code(), ServiceIdMenu.CONFIRM_TRANSACTION_RECORD.getKey(),messageUtil.confirmTransactionRecord(confirmTransactionRecord)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); chargingMessage.setTimingSettingMessage(timingSettingMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 对时设置应答 TimingSettingReply timingSettingReply = new TimingSettingReply(); timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply)); sendResult = enhanceProduce.chargingMessage(chargingMessage); new Thread(new Runnable() { @Override public void run() { TimingSettingReply timingSettingReply = new TimingSettingReply(); timingSettingReply.setCharging_pile_code(timingSettingMessage.getCharging_pile_code()); timingSettingReply.setCurrent_time(CP56Time2aConverter.convertToCP56Time2a(new Date())); iotMessageProduce.sendMessage(timingSettingReply.getCharging_pile_code(), ServiceIdMenu.TIMING_SETTING_REPLY.getKey(),messageUtil.timingSettingReply(timingSettingReply)); } }).start(); // sendResult = enhanceProduce.chargingMessage(chargingMessage); break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); sendResult = enhanceProduce.chargingMessage(chargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // sendResult = enhanceProduce.chargingMessage(chargingMessage); // 响应硬件 break; } log.info("rocketmq消息下发结果:{}",sendResult); // log.info("rocketmq消息下发结果:{}",sendResult); return AjaxResult.success(); } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -45,6 +45,8 @@ import java.util.Date; import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @@ -162,15 +164,16 @@ pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); new Thread(new Runnable() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor.execute(new Runnable() { @Override public void run() { UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); try { tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); }catch (Exception e){ @@ -178,7 +181,7 @@ System.out.println("设备状态推送监管平台失败:"+e.getMessage()); } } }).start(); }); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -187,13 +190,12 @@ EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 监管平台 // 查询订单信息 new Thread(new Runnable() { ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor1.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); try { TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); @@ -203,7 +205,7 @@ System.out.println("充电结束推送监管平台失败:"+e.getMessage()); } } }).start(); }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -264,18 +266,20 @@ if(i == 0){ log.error("数据存储mongo失败"); } // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); new Thread(new Runnable() { ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor2.execute(new Runnable() { @Override public void run() { // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); } }).start(); }); } catch (Exception e) { e.printStackTrace(); } @@ -303,8 +307,15 @@ BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor3.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); @@ -313,8 +324,14 @@ MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor4.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); @@ -323,12 +340,18 @@ BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor5.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } } }); break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); @@ -353,10 +376,16 @@ PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); // 业务处理 PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); chargingOrderClient.startChargeSuccessfully(message1); ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor6.execute(new Runnable() { @Override public void run() { // 业务处理 PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); chargingOrderClient.startChargeSuccessfully(message1); } }); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); @@ -365,9 +394,15 @@ PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor7.execute(new Runnable() { @Override public void run() { PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); } }); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); @@ -377,20 +412,26 @@ TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecordMessage,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor8.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecordMessage,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } } }); // 添加实时上传记录结束记录 @@ -488,9 +529,15 @@ SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor9.execute(new Runnable() { @Override public void run() { SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); } }); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java
New file @@ -0,0 +1,535 @@ package com.ruoyi.integration.rocket.produce; import com.alibaba.fastjson.JSONObject; import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient; import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.drainage.TCECPushUtil; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.*; import com.ruoyi.integration.rocket.model.*; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; import com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO; import com.ruoyi.order.api.vo.PlatformStopChargingReplyVO; import com.ruoyi.order.api.vo.SecurityDetectionVO; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.math.RoundingMode; import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @Component public class ChargingMessageUtil { @Autowired private AcquisitionBillingModeService acquisitionBillingModeService; @Autowired private BillingModeVerifyService billingModeVerifyService; @Autowired private BmsAbortService bmsAbortService; @Resource private ChargingOrderClient chargingOrderClient; @Autowired private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService; @Autowired private OnlineService onlineService; @Autowired private PingService pingService; @Autowired private EndChargeService endChargeService; @Autowired private ErrorMessageMessageService errorMessageMessageService; @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Resource private AccountingStrategyDetailClient accountingStrategyDetailClient; @Autowired private ChargingHandshakeService chargingHandshakeService; @Autowired private ParameterSettingService parameterSettingService; @Autowired private MotorAbortService motorAbortService; @Autowired private BmsInformationService bmsInformationService; @Autowired private ChargingPileStartsChargingService chargingPileStartsChargingService; @Autowired private PlatformStartChargingReplyService platformStartChargingReplyService; @Autowired private PlatformStopChargingReplyService platformStopChargingReplyService; @Autowired private TransactionRecordService transactionRecordService; @Autowired private UpdateBalanceReplyService updateBalanceReplyService; @Autowired private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService; @Autowired private ClearOfflineCardReplyService clearOfflineCardReplyService; @Autowired private WorkingParameterSettingReplyService workingParameterSettingReplyService; @Autowired private TimingSettingService timingSettingService; @Autowired private SetupBillingModelReplyService setupBillingModelReplyService; @Autowired private GroundLockRealTimeDataService groundLockRealTimeDataService; @Autowired private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService; @Autowired private PlatformRestartReplyService platformRestartReplyService; @Autowired private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService; @Autowired private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; @Autowired private SecurityDetectionService securityDetectionService; @Autowired private TCECPushUtil tcecPushUtil; @Resource private ChargingPileClient chargingPileClient; @Resource private ChargingGunClient chargingGunClient; @Resource private RedisTemplate redisTemplate; public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); log.info("充电桩登录认证业务消息处理:{}",onlineMessage); // 持久化消息 Online online = new Online(); BeanUtils.copyProperties(onlineMessage,online); onlineService.create(online); break; case SendTagConstant.PING: PingMessage pingMessage = message.getPingMessage(); log.info("充电桩心跳包-业务消息处理:{}",pingMessage); // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor.execute(new Runnable() { @Override public void run() { UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); try { tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); }catch (Exception e){ e.printStackTrace(); System.out.println("设备状态推送监管平台失败:"+e.getMessage()); } } }); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); log.info("充电结束-业务消息处理:{}",endChargeMessage); // 持久化消息 EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor1.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); try { TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); }catch (Exception e){ e.printStackTrace(); System.out.println("充电结束推送监管平台失败:"+e.getMessage()); } } }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); log.info("错误报文-业务消息处理:{}",errorMessageMessage1); // 持久化消息 ErrorMessageMessage errorMessageMessage = new ErrorMessageMessage(); BeanUtils.copyProperties(errorMessageMessage1,errorMessageMessage); errorMessageMessageService.create(errorMessageMessage); break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = message.getBillingModeVerifyMessage(); log.info("计费模型验证请求-业务消息处理:{}",billingModeVerifyMessage); // 持久化消息 BillingModeVerify billingModeVerify = new BillingModeVerify(); BeanUtils.copyProperties(billingModeVerifyMessage,billingModeVerify); billingModeVerifyService.create(billingModeVerify); break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = message.getAcquisitionBillingModeMessage(); log.info("充电桩计费模型请求-业务消息处理:{}",acquisitionBillingModeMessage); // 持久化消息 AcquisitionBillingMode acquisitionBillingMode = new AcquisitionBillingMode(); BeanUtils.copyProperties(acquisitionBillingModeMessage,acquisitionBillingMode); acquisitionBillingModeService.create(acquisitionBillingMode); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: try { UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(uploadRealTimeMonitoringDataMessage,uploadRealTimeMonitoringData); // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()); // 查询订单 TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(uploadRealTimeMonitoringDataMessage.getTransaction_serial_number()).getData(); // 查询当前时间段的计费策略 TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); if (Objects.nonNull(data)) { uploadRealTimeMonitoringDataService.updateById(data.getId()); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount().subtract(data.getPaid_amount())); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree().subtract(data.getCharging_degree())); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); }else { log.info("首次上传实时监测数据"); uploadRealTimeMonitoringData.setPeriod_electric_price(uploadRealTimeMonitoringDataMessage.getPaid_amount()); uploadRealTimeMonitoringData.setPeriod_charging_degree(uploadRealTimeMonitoringDataMessage.getCharging_degree()); uploadRealTimeMonitoringData.setPeriod_service_price(uploadRealTimeMonitoringDataMessage.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); } uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); // uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); // uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); if(i == 0){ log.error("数据存储mongo失败"); } ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor2.execute(new Runnable() { @Override public void run() { // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); } }); } catch (Exception e) { e.printStackTrace(); } break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = message.getChargingHandshakeMessage(); log.info("充电握手-业务消息处理:{}",chargingHandshakeMessage); // 持久化消息 ChargingHandshake chargingHandshake = new ChargingHandshake(); BeanUtils.copyProperties(chargingHandshakeMessage,chargingHandshake); chargingHandshakeService.create(chargingHandshake); break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); log.info("业务消息处理:{}",parameterSettingMessage); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); parameterSettingService.create(parameterSetting); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = message.getBmsAbortMessage(); log.info("充电阶段BMS中止-业务消息处理:{}",bmsAbortMessage); // 持久化消息 BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor3.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); log.info("充电阶段充电机中止-业务消息处理:{}",motorAbortMessage); // 持久化消息 MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor4.execute(new Runnable() { @Override public void run() { // 业务处理 chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); } }); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); log.info("充电过程BMS需求、充电机输出-业务消息处理:{}",bmsDemandAndChargerExportationMessage); // 持久化消息 BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor5.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderBms)){ chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); chargingOrderClient.updateChargingOrder(chargingOrderBms); } } }); break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); log.info("充电过程BMS信息-业务消息处理:{}",bmsInformationMessage); // 持久化消息 BmsInformation bmsInformation = new BmsInformation(); BeanUtils.copyProperties(bmsInformationMessage,bmsInformation); bmsInformationService.create(bmsInformation); break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = message.getChargingPileStartsChargingMessage(); log.info("充电桩主动申请启动充电-业务消息处理:{}",chargingPileStartsChargingMessage); // 持久化消息 ChargingPileStartsCharging chargingPileStartsCharging = new ChargingPileStartsCharging(); BeanUtils.copyProperties(chargingPileStartsChargingMessage,chargingPileStartsCharging); chargingPileStartsChargingService.create(chargingPileStartsCharging); break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = message.getPlatformStartChargingReplyMessage(); log.info("远程启机命令回复-业务消息处理:{}",platformStartChargingReplyMessage); // 持久化消息 PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor6.execute(new Runnable() { @Override public void run() { // 业务处理 PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); chargingOrderClient.startChargeSuccessfully(message1); } }); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); log.info("远程停机命令回复-业务消息处理:{}",platformStopChargingReplyMessage); // 持久化消息 PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor7.execute(new Runnable() { @Override public void run() { PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); } }); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); log.info("交易记录-业务消息处理:{}",transactionRecordMessage); transactionRecordMessage.setResult(JSONObject.toJSONString(message)); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor8.execute(new Runnable() { @Override public void run() { // 业务处理 TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); if(Objects.nonNull(chargingOrderRecord)){ chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); chargingOrderClient.updateChargingOrder(chargingOrderRecord); } //计算费用 TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); BeanUtils.copyProperties(transactionRecordMessage,vo); int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); if(200 != code){ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } } }); // 添加实时上传记录结束记录 // 查询mogondb上一条数据 UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(transactionRecordMessage.getTransaction_serial_number()); if(Objects.nonNull(data) && data.getStatus() != 5){ UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); BeanUtils.copyProperties(data,uploadRealTimeMonitoringData); uploadRealTimeMonitoringData.setStatus(5); uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); } break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = message.getUpdateBalanceReplyMessage(); log.info("余额更新应答-业务消息处理:{}",updateBalanceReplyMessage); // 持久化消息 UpdateBalanceReply updateBalanceReply = new UpdateBalanceReply(); BeanUtils.copyProperties(updateBalanceReplyMessage,updateBalanceReply); updateBalanceReplyService.create(updateBalanceReply); break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = message.getSynchronizeOfflineCardReplyMessage(); log.info("卡数据同步应答-业务消息处理:{}",synchronizeOfflineCardReplyMessage); // 持久化消息 SynchronizeOfflineCardReply synchronizeOfflineCardReply = new SynchronizeOfflineCardReply(); BeanUtils.copyProperties(synchronizeOfflineCardReplyMessage,synchronizeOfflineCardReply); synchronizeOfflineCardReplyService.create(synchronizeOfflineCardReply); break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = message.getClearOfflineCardReplyMessage(); log.info("离线卡数据清除应答-业务消息处理:{}",clearOfflineCardReplyMessage); // 持久化消息 ClearOfflineCardReply clearOfflineCardReply = new ClearOfflineCardReply(); BeanUtils.copyProperties(clearOfflineCardReplyMessage,clearOfflineCardReply); clearOfflineCardReplyService.create(clearOfflineCardReply); break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = message.getWorkingParameterSettingReplyMessage(); log.info("充电桩工作参数设置应答-业务消息处理:{}",workingParameterSettingReplyMessage); // 持久化消息 WorkingParameterSettingReply workingParameterSettingReply = new WorkingParameterSettingReply(); BeanUtils.copyProperties(workingParameterSettingReplyMessage,workingParameterSettingReply); workingParameterSettingReplyService.create(workingParameterSettingReply); break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = message.getTimingSettingMessage(); log.info("对时设置-业务消息处理:{}",timingSettingMessage); // 持久化消息 TimingSetting timingSetting = new TimingSetting(); BeanUtils.copyProperties(timingSettingMessage,timingSetting); timingSettingService.create(timingSetting); break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = message.getSetupBillingModelReplyMessage(); log.info("计费模型应答-业务消息处理:{}",setupBillingModelReplyMessage); // 持久化消息 SetupBillingModelReply setupBillingModelReply = new SetupBillingModelReply(); BeanUtils.copyProperties(setupBillingModelReplyMessage,setupBillingModelReply); setupBillingModelReplyService.create(setupBillingModelReply); break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = message.getGroundLockRealTimeDataMessage(); log.info("地锁数据上送(充电桩上送)-业务消息处理:{}",groundLockRealTimeDataMessage); // 持久化消息 GroundLockRealTimeData groundLockRealTimeData = new GroundLockRealTimeData(); BeanUtils.copyProperties(groundLockRealTimeDataMessage,groundLockRealTimeData); groundLockRealTimeDataService.create(groundLockRealTimeData); break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = message.getChargingPileReturnsGroundLockDataMessage(); log.info("充电桩返回数据(上行)-业务消息处理:{}",chargingPileReturnsGroundLockDataMessage); // 持久化消息 ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData = new ChargingPileReturnsGroundLockData(); BeanUtils.copyProperties(chargingPileReturnsGroundLockDataMessage,chargingPileReturnsGroundLockData); chargingPileReturnsGroundLockDataService.create(chargingPileReturnsGroundLockData); break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = message.getPlatformRestartReplyMessage(); log.info("远程重启应答-业务消息处理:{}",platformRestartReplyMessage); // 持久化消息 PlatformRestartReply platformRestartReply = new PlatformRestartReply(); BeanUtils.copyProperties(platformRestartReplyMessage,platformRestartReply); platformRestartReplyService.create(platformRestartReply); break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = message.getQrCodeDeliveryReplyMessage(); log.info("二维码下发应答-业务消息处理:{}",qrCodeDeliveryReplyMessage); QrCodeDeliveryReply qrCodeDeliveryReply = new QrCodeDeliveryReply(); BeanUtils.copyProperties(qrCodeDeliveryReplyMessage,qrCodeDeliveryReply); qrCodeDeliveryReplyService.create(qrCodeDeliveryReply); break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = message.getSecurityDetectionMessage(); log.info("安全监测-业务消息处理:{}",securityDetectionMessage); SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); threadPoolExecutor9.execute(new Runnable() { @Override public void run() { SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); BeanUtils.copyProperties(securityDetection, securityDetection1); chargingOrderClient.securityDetection(securityDetection1); } }); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); break; } } } ruoyi-service/ruoyi-order/src/main/java/com/ruoyi/order/service/impl/TChargingOrderServiceImpl.java
@@ -731,7 +731,7 @@ Integer counter = counter_map.get(code); PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key); //5分钟内还未插枪则取消充电,退回金额。 if(failure_cause == 5 && (null == counter || counter < 300)){ if(failure_cause == 5 && (null == counter || counter < 1800)){ counter = (null == counter ? 0 : counter) + 1; counter_map.put(code, counter); //启动失败 @@ -772,7 +772,7 @@ log.info(code + ":-------------------未上传开启充电结果-------------------" + counter); PreChargeCheck preChargeCheck1 = redisService.getCacheObject(key); //5分钟内未启动成功,退回金额。 if(null == counter || counter < 300){ if(null == counter || counter < 1800){ counter = (null == counter ? 0 : counter) + 1; boot_failed_map.put(code, counter); //启动失败