From e0e00851c1fe82e77a013626862366f25a03ffa1 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期四, 17 四月 2025 09:56:11 +0800 Subject: [PATCH] 修改队列消息业务功能异步处理 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java | 159 ++++++++++++++++++++++++++++++++++------------------ 1 files changed, 103 insertions(+), 56 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java index ce95aeb..d4db2fc 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java @@ -45,6 +45,8 @@ import java.util.Date; import java.util.Objects; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @@ -162,15 +164,16 @@ pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); - - UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); - vo1.setGun_code(pingMessage.getCharging_gun_code()); - vo1.setPile_code(pingMessage.getCharging_pile_code()); - vo1.setStatus(pingMessage.getCharging_gun_status()); - chargingPileClient.updateChargingPileStatus(vo1); - new Thread(new Runnable() { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor.execute(new Runnable() { @Override public void run() { + UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); + vo1.setGun_code(pingMessage.getCharging_gun_code()); + vo1.setPile_code(pingMessage.getCharging_pile_code()); + vo1.setStatus(pingMessage.getCharging_gun_status()); + chargingPileClient.updateChargingPileStatus(vo1); + try { tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); }catch (Exception e){ @@ -178,7 +181,7 @@ System.out.println("设备状态推送监管平台失败:"+e.getMessage()); } } - }).start(); + }); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -187,13 +190,12 @@ EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); - // 业务处理 - chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); - // 监管平台 - // 查询订单信息 - new Thread(new Runnable() { + ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor1.execute(new Runnable() { @Override public void run() { + // 业务处理 + chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); try { TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); @@ -203,7 +205,7 @@ System.out.println("充电结束推送监管平台失败:"+e.getMessage()); } } - }).start(); + }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -264,18 +266,20 @@ if(i == 0){ log.error("数据存储mongo失败"); } - // 业务处理 - UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); - BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); - chargingOrderClient.chargeMonitoring(query); - chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - new Thread(new Runnable() { + + ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor2.execute(new Runnable() { @Override public void run() { + // 业务处理 + UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); + BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); + chargingOrderClient.chargeMonitoring(query); + chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); } - }).start(); - + }); } catch (Exception e) { e.printStackTrace(); } @@ -303,8 +307,15 @@ BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); - // 业务处理 - chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); + + ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor3.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); + } + }); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); @@ -313,8 +324,14 @@ MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); - // 业务处理 - chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); + ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor4.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); + } + }); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); @@ -323,12 +340,18 @@ BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); - // 业务处理 - TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderBms)){ - chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); - chargingOrderClient.updateChargingOrder(chargingOrderBms); - } + ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor5.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderBms)){ + chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); + chargingOrderClient.updateChargingOrder(chargingOrderBms); + } + } + }); break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); @@ -353,10 +376,16 @@ PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); - // 业务处理 - PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); - BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); - chargingOrderClient.startChargeSuccessfully(message1); + ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor6.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + PlatformStartChargingReplyMessageVO message1 = new com.ruoyi.order.api.vo.PlatformStartChargingReplyMessageVO(); + BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); + chargingOrderClient.startChargeSuccessfully(message1); + } + }); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); @@ -365,9 +394,15 @@ PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); - PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); - BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); - chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); + ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor7.execute(new Runnable() { + @Override + public void run() { + PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); + BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); + chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); + } + }); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); @@ -377,20 +412,26 @@ TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); - // 业务处理 - TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderRecord)){ - chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); - chargingOrderClient.updateChargingOrder(chargingOrderRecord); - } - //计算费用 - TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); - BeanUtils.copyProperties(transactionRecordMessage,vo); - int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); - if(200 != code){ - //失败后添加到队列中继续处理数据 - redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); - } + ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor8.execute(new Runnable() { + @Override + public void run() { + // 业务处理 + TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderRecord)){ + chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); + chargingOrderClient.updateChargingOrder(chargingOrderRecord); + } + //计算费用 + TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); + BeanUtils.copyProperties(transactionRecordMessage,vo); + int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); + if(200 != code){ + //失败后添加到队列中继续处理数据 + redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); + } + } + }); // 添加实时上传记录结束记录 @@ -488,9 +529,15 @@ SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); - SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); - BeanUtils.copyProperties(securityDetection, securityDetection1); - chargingOrderClient.securityDetection(securityDetection1); + ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + threadPoolExecutor9.execute(new Runnable() { + @Override + public void run() { + SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); + BeanUtils.copyProperties(securityDetection, securityDetection1); + chargingOrderClient.securityDetection(securityDetection1); + } + }); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); -- Gitblit v1.7.1