ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/RuoYiIntegrationApplication.java
@@ -27,11 +27,7 @@ @EnableBinding({ Source.class, Sink.class }) public class RuoYiIntegrationApplication { public static void main(String[] args) { try{ SpringApplication.run(RuoYiIntegrationApplication.class, args); }catch (Exception e){ e.printStackTrace(); } System.out.println("(♥◠‿◠)ノ゙ 硬件集成模块启动成功 ლ(´ڡ`ლ)゙ \n" + " .-------. ____ __ \n" + " | _ _ \\ \\ \\ / / \n" + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/PingService.java
@@ -9,4 +9,11 @@ * 定时删除数据 */ void delPing(); /** * 保存数据 * @param ping */ void save(Ping ping); } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/mongodb/service/impl/PingServiceImpl.java
@@ -1,6 +1,7 @@ package com.ruoyi.integration.mongodb.service.impl; import com.mongodb.client.result.DeleteResult; import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.integration.iotda.constant.IotConstant; import com.ruoyi.integration.api.model.Ping; import com.ruoyi.integration.mongodb.service.PingService; @@ -10,8 +11,10 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.List; @Service @@ -39,4 +42,24 @@ public void delPing() { mongoTemplate.remove(new Query().addCriteria(Criteria.where("create_time").lt(LocalDateTime.now().minusDays(1))), Ping.class); } @Override public void save(Ping ping) { Query query = new Query(); if (StringUtils.isNotEmpty(ping.getCharging_pile_code())) { query.addCriteria(Criteria.where("charging_pile_code").is(ping.getCharging_pile_code())); } if (StringUtils.isNotEmpty(ping.getCharging_gun_code())){ query.addCriteria(Criteria.where("charging_gun_code").is(ping.getCharging_gun_code())); } List<Ping> pings = mongoTemplate.find(query, Ping.class); if(pings.size() > 0){ Ping ping1 = pings.get(0); ping1.setLast_time(new Date()); ping1.setCharging_gun_status(ping.getCharging_gun_status()); mongoTemplate.save(ping1); }else{ mongoTemplate.save(ping); } } } ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/ChargingMessageListener.java
@@ -54,7 +54,7 @@ consumerGroup = "charge_charging_message", topic = "charge_charging_message", selectorExpression = "charging_message", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { @@ -138,10 +138,12 @@ @StreamListener("input") @Override protected void handleMessage(ChargingMessage message) throws Exception { log.info("rocket收到的消息内容:{}",message); String serviceId = message.getServiceId(); if(!StringUtils.hasLength(serviceId)){ return; } log.info("rocket收到的消息内容:{} {}", serviceId,message); switch (serviceId){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = message.getOnlineMessage(); @@ -157,7 +159,7 @@ // 持久化消息 Ping ping = new Ping(); BeanUtils.copyProperties(pingMessage,ping); // pingService.create(ping); pingService.save(ping); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); @@ -166,11 +168,17 @@ vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); new Thread(new Runnable() { @Override public void run() { try { tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); }catch (Exception e){ e.printStackTrace(); System.out.println("设备状态推送监管平台失败:"+e.getMessage()); } } }).start(); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = message.getEndChargeMessage(); @@ -183,13 +191,19 @@ chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); // 监管平台 // 查询订单信息 new Thread(new Runnable() { @Override public void run() { try { TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); }catch (Exception e){ e.printStackTrace(); System.out.println("充电结束推送监管平台失败:"+e.getMessage()); } } }).start(); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); @@ -255,7 +269,12 @@ BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); new Thread(new Runnable() { @Override public void run() { tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); } }).start(); } catch (Exception e) { e.printStackTrace(); ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java
@@ -351,6 +351,9 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CHARGING_MESSAGE); return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_MESSAGE, SendTagConstant.CHARGING_MESSAGE, message); }