From 8d6ef24217033e13b356502f2ade8737a43cce2b Mon Sep 17 00:00:00 2001 From: 无关风月 <443237572@qq.com> Date: 星期六, 19 四月 2025 17:52:15 +0800 Subject: [PATCH] MQ消息队列监管平台数据上传 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java | 222 ++++++++++++++++++++++++++---------------------------- 1 files changed, 107 insertions(+), 115 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java index 60bd13f..8072fab 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java @@ -5,6 +5,7 @@ import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +import com.ruoyi.chargingPile.api.model.TChargingGun; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.drainage.TCECPushUtil; @@ -20,6 +21,7 @@ import com.ruoyi.order.api.vo.SecurityDetectionVO; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; @@ -115,7 +117,8 @@ @Resource private RedisTemplate redisTemplate; - + @Autowired + private EnhanceProduce enhanceProduce; @@ -146,24 +149,22 @@ pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor.execute(new Runnable() { - @Override - public void run() { - UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); - vo1.setGun_code(pingMessage.getCharging_gun_code()); - vo1.setPile_code(pingMessage.getCharging_pile_code()); - vo1.setStatus(pingMessage.getCharging_gun_status()); - chargingPileClient.updateChargingPileStatus(vo1); - try { - tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); - }catch (Exception e){ - e.printStackTrace(); - System.out.println("设备状态推送监管平台失败:"+e.getMessage()); - } - } - }); + UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); + vo1.setGun_code(pingMessage.getCharging_gun_code()); + vo1.setPile_code(pingMessage.getCharging_pile_code()); + vo1.setStatus(pingMessage.getCharging_gun_status()); + chargingPileClient.updateChargingPileStatus(vo1); + // 监管平台推送充电设备状态 + SendResult sendResult; + String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); + ChargingMessage chargingMessage = new ChargingMessage(); + chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); + GunStatusMessage gunStatusMessage = new GunStatusMessage(); + gunStatusMessage.setFullNumber(gunCode); + chargingMessage.setGunStatusMessage(gunStatusMessage); + sendResult = enhanceProduce.gunStatusMessage(chargingMessage); + break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -172,22 +173,36 @@ EndCharge endCharge = new EndCharge(); BeanUtils.copyProperties(endChargeMessage,endCharge); endChargeService.create(endCharge); - ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor1.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); - try { - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); - tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); - }catch (Exception e){ - e.printStackTrace(); - System.out.println("充电结束推送监管平台失败:"+e.getMessage()); - } - } - }); + // 业务处理 + chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); + // 订单id + String transactionSerialNumber = endCharge.getTransaction_serial_number(); + ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); + chargingOrderMessage.setOrderNumber(transactionSerialNumber); + // 推送充电订单信息 + ChargingMessage chargingMessage1 = new ChargingMessage(); + chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); + chargingMessage1.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderInfoMessage(chargingMessage1); + // 推送充电订单状态 + ChargingMessage chargingMessage2 = new ChargingMessage(); + chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage2.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderStatusMessage(chargingMessage2); +// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor1.execute(new Runnable() { +// @Override +// public void run() { +// try { +// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); +// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); +// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); +// }catch (Exception e){ +// e.printStackTrace(); +// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); +// } +// } +// }); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -249,19 +264,27 @@ log.error("数据存储mongo失败"); } - ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor2.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); - BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); - chargingOrderClient.chargeMonitoring(query); - chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); - } - }); + // 业务处理 + UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); + BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); + chargingOrderClient.chargeMonitoring(query); + // 订单id + ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); + chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); + // 推送充电订单信息 + ChargingMessage chargingMessage4 = new ChargingMessage(); + chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage4.setOrderMessage(chargingOrderMessage3); + enhanceProduce.orderInfoMessage(chargingMessage4); +// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); +// threadPoolExecutor2.execute(new Runnable() { +// @Override +// public void run() { +// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); +// +// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); +// } +// }); } catch (Exception e) { e.printStackTrace(); } @@ -306,14 +329,8 @@ MotorAbort motorAbort = new MotorAbort(); BeanUtils.copyProperties(motorAbortMessage,motorAbort); motorAbortService.create(motorAbort); - ThreadPoolExecutor threadPoolExecutor4 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor4.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); - } - }); + // 业务处理 + chargingOrderClient.excelEndCharge(motorAbort.getTransaction_serial_number()); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = message.getBmsDemandAndChargerExportationMessage(); @@ -322,18 +339,13 @@ BmsDemandAndChargerExportation bmsDemandAndChargerExportation = new BmsDemandAndChargerExportation(); BeanUtils.copyProperties(bmsDemandAndChargerExportationMessage,bmsDemandAndChargerExportation); bmsDemandAndChargerExportationService.create(bmsDemandAndChargerExportation); - ThreadPoolExecutor threadPoolExecutor5 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor5.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderBms)){ - chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); - chargingOrderClient.updateChargingOrder(chargingOrderBms); - } - } - }); + + // 业务处理 + TChargingOrder chargingOrderBms = chargingOrderClient.getOrderByCode(bmsDemandAndChargerExportationMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderBms)){ + chargingOrderBms.setNeedElec(bmsDemandAndChargerExportationMessage.getBms_current_requirements()); + chargingOrderClient.updateChargingOrder(chargingOrderBms); + } break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = message.getBmsInformationMessage(); @@ -358,16 +370,11 @@ PlatformStartChargingReply platformStartChargingReply = new PlatformStartChargingReply(); BeanUtils.copyProperties(platformStartChargingReplyMessage,platformStartChargingReply); platformStartChargingReplyService.create(platformStartChargingReply); - ThreadPoolExecutor threadPoolExecutor6 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor6.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); - BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); - chargingOrderClient.startChargeSuccessfully(message1); - } - }); + + // 业务处理 + PlatformStartChargingReplyMessageVO message1 = new PlatformStartChargingReplyMessageVO(); + BeanUtils.copyProperties(platformStartChargingReplyMessage, message1); + chargingOrderClient.startChargeSuccessfully(message1); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = message.getPlatformStopChargingReplyMessage(); @@ -376,15 +383,10 @@ PlatformStopChargingReply platformStopChargingReply = new PlatformStopChargingReply(); BeanUtils.copyProperties(platformStopChargingReplyMessage,platformStopChargingReply); platformStopChargingReplyService.create(platformStopChargingReply); - ThreadPoolExecutor threadPoolExecutor7 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor7.execute(new Runnable() { - @Override - public void run() { - PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); - BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); - chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); - } - }); + + PlatformStopChargingReplyVO platformStopChargingReply1 = new PlatformStopChargingReplyVO(); + BeanUtils.copyProperties(platformStopChargingReply, platformStopChargingReply1); + chargingOrderClient.terminateSuccessfulResponse(platformStopChargingReply1); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); @@ -394,26 +396,21 @@ TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); transactionRecordService.create(transactionRecord); - ThreadPoolExecutor threadPoolExecutor8 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor8.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); - if(Objects.nonNull(chargingOrderRecord)){ - chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); - chargingOrderClient.updateChargingOrder(chargingOrderRecord); - } - //计算费用 - TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); - BeanUtils.copyProperties(transactionRecordMessage,vo); - int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); - if(200 != code){ - //失败后添加到队列中继续处理数据 - redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); - } - } - }); + + // 业务处理 + TChargingOrder chargingOrderRecord = chargingOrderClient.getOrderByCode(transactionRecordMessage.getTransaction_serial_number()).getData(); + if(Objects.nonNull(chargingOrderRecord)){ + chargingOrderRecord.setTotalElectricity(transactionRecordMessage.getTotal_electricity()); + chargingOrderClient.updateChargingOrder(chargingOrderRecord); + } + //计算费用 + TransactionRecordMessageVO vo = new TransactionRecordMessageVO(); + BeanUtils.copyProperties(transactionRecordMessage,vo); + int code = chargingOrderClient.endChargeBillingCharge(vo).getCode(); + if(200 != code){ + //失败后添加到队列中继续处理数据 + redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); + } // 添加实时上传记录结束记录 @@ -511,15 +508,10 @@ SecurityDetection securityDetection = new SecurityDetection(); BeanUtils.copyProperties(securityDetectionMessage,securityDetection); securityDetectionService.create(securityDetection); - ThreadPoolExecutor threadPoolExecutor9 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor9.execute(new Runnable() { - @Override - public void run() { - SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); - BeanUtils.copyProperties(securityDetection, securityDetection1); - chargingOrderClient.securityDetection(securityDetection1); - } - }); + + SecurityDetectionVO securityDetection1 = new SecurityDetectionVO(); + BeanUtils.copyProperties(securityDetection, securityDetection1); + chargingOrderClient.securityDetection(securityDetection1); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); -- Gitblit v1.7.1