From c04deac8e4c1f3ae7e445026bb5a4c65af808e02 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期三, 16 四月 2025 14:28:21 +0800 Subject: [PATCH] 修改监控接口为异步上传 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java | 23 +++++++++++ ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java | 3 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java | 49 +++++++++++++++++------- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java | 6 -- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java | 7 +++ 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java index 8848895..a6e9f2b 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java @@ -27,11 +27,7 @@ @EnableBinding({ Source.class, Sink.class }) public class RuoYiIntegrationApplication { public static void main(String[] args) { - try{ - SpringApplication.run(RuoYiIntegrationApplication.class, args); - }catch (Exception e){ - e.printStackTrace(); - } + SpringApplication.run(RuoYiIntegrationApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 硬件集成模块启动成功 ლ(´ڡ`ლ)゙ \n" + " .-------. ____ __ \n" + " | _ _ \\ \\ \\ / / \n" + diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java index 04b2588..3b12193 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java @@ -9,4 +9,11 @@ * 定时删除数据 */ void delPing(); + + + /** + * 保存数据 + * @param ping + */ + void save(Ping ping); } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java index 9489e96..a9bad84 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java @@ -1,6 +1,7 @@ package com.ruoyi.integration.mongodb.service.impl; import com.mongodb.client.result.DeleteResult; +import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.integration.iotda.constant.IotConstant; import com.ruoyi.integration.api.model.Ping; import com.ruoyi.integration.mongodb.service.PingService; @@ -10,8 +11,10 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; +import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.Date; import java.util.List; @Service @@ -39,4 +42,24 @@ public void delPing() { mongoTemplate.remove(new Query().addCriteria(Criteria.where("create_time").lt(LocalDateTime.now().minusDays(1))), Ping.class); } + + @Override + public void save(Ping ping) { + Query query = new Query(); + if (StringUtils.isNotEmpty(ping.getCharging_pile_code())) { + query.addCriteria(Criteria.where("charging_pile_code").is(ping.getCharging_pile_code())); + } + if (StringUtils.isNotEmpty(ping.getCharging_gun_code())){ + query.addCriteria(Criteria.where("charging_gun_code").is(ping.getCharging_gun_code())); + } + List<Ping> pings = mongoTemplate.find(query, Ping.class); + if(pings.size() > 0){ + Ping ping1 = pings.get(0); + ping1.setLast_time(new Date()); + ping1.setCharging_gun_status(ping.getCharging_gun_status()); + mongoTemplate.save(ping1); + }else{ + mongoTemplate.save(ping); + } + } } diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java index 112bc2a..ce95aeb 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java @@ -54,7 +54,7 @@ consumerGroup = "charge_charging_message", topic = "charge_charging_message", selectorExpression = "charging_message", - consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 + consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { @@ -138,10 +138,12 @@ @StreamListener("input") @Override protected void handleMessage(ChargingMessage message) throws Exception { + log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } + log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); @@ -157,7 +159,7 @@ // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); -// pingService.create(ping); + pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); @@ -166,11 +168,17 @@ vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); - try { - tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); - }catch (Exception e){ - System.out.println("设备状态推送监管平台失败:"+e.getMessage()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("设备状态推送监管平台失败:"+e.getMessage()); + } + } + }).start(); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -183,13 +191,19 @@ chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 监管平台 // 查询订单信息 - try { - TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); - tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); - }catch (Exception e){ - System.out.println("充电结束推送监管平台失败:"+e.getMessage()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); + tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + }catch (Exception e){ + e.printStackTrace(); + System.out.println("充电结束推送监管平台失败:"+e.getMessage()); + } + } + }).start(); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -255,7 +269,12 @@ BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); - tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + new Thread(new Runnable() { + @Override + public void run() { + tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); + } + }).start(); } catch (Exception e) { e.printStackTrace(); diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java index ce9a938..eae74c8 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java @@ -351,6 +351,9 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CHARGING_MESSAGE); + + + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_MESSAGE, SendTagConstant.CHARGING_MESSAGE, message); } -- Gitblit v1.7.1