From c04deac8e4c1f3ae7e445026bb5a4c65af808e02 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期三, 16 四月 2025 14:28:21 +0800 Subject: [PATCH] 修改监控接口为异步上传 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java | 49 ++++++++++++++++++++++++++++++++++--------------- 1 files changed, 34 insertions(+), 15 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java index 112bc2a..ce95aeb 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java @@ -54,7 +54,7 @@ consumerGroup = "charge_charging_message", topic = "charge_charging_message", selectorExpression = "charging_message", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { @@ -138,10 +138,12 @@ @StreamListener("input") @Override protected void handleMessage(ChargingMessage message) throws Exception { + log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } + log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); @@ -157,7 +159,7 @@ // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); -// pingService.create(ping); + pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); @@ -166,11 +168,17 @@ vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); - try { - tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); - }catch (Exception e){ - System.out.println("设备状态推送监管平台失败:"+e.getMessage()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("设备状态推送监管平台失败:"+e.getMessage()); + } + } + }).start(); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -183,13 +191,19 @@ chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 监管平台 // 查询订单信息 - try { - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); - tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); - }catch (Exception e){ - System.out.println("充电结束推送监管平台失败:"+e.getMessage()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); + tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("充电结束推送监管平台失败:"+e.getMessage()); + } + } + }).start(); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -255,7 +269,12 @@ BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + new Thread(new Runnable() { + @Override + public void run() { + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + } + }).start(); } catch (Exception e) { e.printStackTrace(); -- Gitblit v1.7.1