From 2608fc201aed29d87f0edcfebcc57538b15dd2f6 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期四, 19 六月 2025 09:49:18 +0800 Subject: [PATCH] 修改单方推送站点电话 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java | 120 +++++++++++++++++++++++++---------------------------------- 1 files changed, 51 insertions(+), 69 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java index 780b998..80ee746 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageUtil.java @@ -5,13 +5,13 @@ import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; +import com.ruoyi.chargingPile.api.model.TChargingGun; import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.drainage.TCECPushUtil; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.mongodb.service.*; import com.ruoyi.integration.rocket.model.*; -import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; @@ -20,12 +20,9 @@ import com.ruoyi.order.api.vo.SecurityDetectionVO; import com.ruoyi.order.api.vo.TransactionRecordMessageVO; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.client.producer.SendResult; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -105,8 +102,8 @@ private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; @Autowired private SecurityDetectionService securityDetectionService; -// @Autowired -// private TCECPushUtil tcecPushUtil; + @Autowired + private TCECPushUtil tcecPushUtil; @Resource private ChargingPileClient chargingPileClient; @@ -115,19 +112,19 @@ @Resource private RedisTemplate redisTemplate; - - - - - + @Autowired + private EnhanceProduce enhanceProduce; + + + + public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ - log.info("rocket收到的消息内容:{}",message); + log.info("收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } - log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); @@ -146,26 +143,20 @@ pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); - + UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); vo1.setGun_code(pingMessage.getCharging_gun_code()); vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); - -// ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); -// threadPoolExecutor.execute(new Runnable() { -// @Override -// public void run() { -// -// try { -//// tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); -// }catch (Exception e){ -// e.printStackTrace(); -// System.out.println("设备状态推送监管平台失败:"+e.getMessage()); -// } -// } -// }); + // 监管平台推送充电设备状态 + String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); + ChargingMessage chargingMessage = new ChargingMessage(); + chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); + GunStatusMessage gunStatusMessage = new GunStatusMessage(); + gunStatusMessage.setFullNumber(gunCode); + chargingMessage.setGunStatusMessage(gunStatusMessage); + SendResult sendResult = enhanceProduce.gunStatusMessage(chargingMessage); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -176,20 +167,20 @@ endChargeService.create(endCharge); // 业务处理 chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); -// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); -// threadPoolExecutor1.execute(new Runnable() { -// @Override -// public void run() { -// try { -// TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); -// tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); -// }catch (Exception e){ -// e.printStackTrace(); -// System.out.println("充电结束推送监管平台失败:"+e.getMessage()); -// } -// } -// }); + // 订单id + String transactionSerialNumber = endCharge.getTransaction_serial_number(); + ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); + chargingOrderMessage.setOrderNumber(transactionSerialNumber); + // 推送充电订单信息 + ChargingMessage chargingMessage1 = new ChargingMessage(); + chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); + chargingMessage1.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderInfoMessage(chargingMessage1); + // 推送充电订单状态 + ChargingMessage chargingMessage2 = new ChargingMessage(); + chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage2.setOrderMessage(chargingOrderMessage); + enhanceProduce.orderStatusMessage(chargingMessage2); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -216,8 +207,8 @@ acquisitionBillingModeService.create(acquisitionBillingMode); break; case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: + UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); try { - UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); // 持久化消息 UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); @@ -244,26 +235,23 @@ uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); -// uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); -// uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); if(i == 0){ log.error("数据存储mongo失败"); } - + // 业务处理 UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); -// ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); -// threadPoolExecutor2.execute(new Runnable() { -// @Override -// public void run() { -// chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); -// -// tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); -// } -// }); + // 订单id + ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); + chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); + // 推送充电订单信息 + ChargingMessage chargingMessage4 = new ChargingMessage(); + chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); + chargingMessage4.setOrderMessage(chargingOrderMessage3); + enhanceProduce.orderInfoMessage(chargingMessage4); } catch (Exception e) { e.printStackTrace(); } @@ -278,7 +266,7 @@ break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); - log.info("业务消息处理:{}",parameterSettingMessage); + log.info("参数配置-业务消息处理:{}",parameterSettingMessage); // 持久化消息 ParameterSetting parameterSetting = new ParameterSetting(); BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); @@ -291,15 +279,8 @@ BmsAbort bmsAbort = new BmsAbort(); BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); bmsAbortService.create(bmsAbort); - - ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - threadPoolExecutor3.execute(new Runnable() { - @Override - public void run() { - // 业务处理 - chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); - } - }); + // 业务处理 + chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); @@ -370,10 +351,11 @@ case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); log.info("交易记录-业务消息处理:{}",transactionRecordMessage); - transactionRecordMessage.setResult(JSONObject.toJSONString(message)); + transactionRecordMessage.setResult(JSONObject.toJSONString(transactionRecordMessage)); // 持久化消息 TransactionRecord transactionRecord = new TransactionRecord(); BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); + transactionRecord.setResult(transactionRecordMessage.getResult()); transactionRecordService.create(transactionRecord); // 业务处理 @@ -390,7 +372,6 @@ //失败后添加到队列中继续处理数据 redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); } - // 添加实时上传记录结束记录 // 查询mogondb上一条数据 @@ -494,7 +475,8 @@ break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); - log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); + log.info("业务消息处理:{}",platformRemoteUpdateReplyMessage); + // 持久化消息 PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); -- Gitblit v1.7.1