From f355ef485a56e613b71d0262c089b995d7ca10d2 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期五, 23 五月 2025 17:38:39 +0800 Subject: [PATCH] 集成MQTT对接公交主防数据 --- ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/ChannelMap.java | 31 ++++++++++++++++++++----------- 1 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/ChannelMap.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/ChannelMap.java index 49e8521..867d549 100644 --- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/ChannelMap.java +++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/ChannelMap.java @@ -7,6 +7,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -16,17 +17,17 @@ */ public class ChannelMap { //存储主链路通道 - private static final ChannelGroup SERVER_GROUP = new DefaultChannelGroup("Jtt809Server", GlobalEventExecutor.INSTANCE); + private static ChannelGroup SERVER_GROUP = new DefaultChannelGroup("Jtt809Server", GlobalEventExecutor.INSTANCE); //存储主链路ID - private static final Map<Integer, ChannelId> SERVER_ID_MAP = new ConcurrentHashMap<>(); + private static Map<Integer, ChannelId> SERVER_ID_MAP = new HashMap<>(); //存储从链路通道 - private static final ChannelGroup CLIENT_GROUP = new DefaultChannelGroup("Jtt809Client", GlobalEventExecutor.INSTANCE); + private static ChannelGroup CLIENT_GROUP = new DefaultChannelGroup("Jtt809Client", GlobalEventExecutor.INSTANCE); //存储从链路ID - private static final Map<Integer, ChannelId> CLIENT_ID_MAP = new ConcurrentHashMap<>(); + private static Map<Integer, ChannelId> CLIENT_ID_MAP = new HashMap<>(); //存储从链路连接地址和端口号 - private static final Map<Integer, UPConnect> IP_PORT = new ConcurrentHashMap<>(); + private static Map<Integer, UPConnect> IP_PORT = new HashMap<>(); //存储从链路连接重试次数 - private static final Map<Integer, Integer> TIMES = new ConcurrentHashMap<>(); + private static Map<Integer, Integer> TIMES = new HashMap<>(); /** * 保存通道 @@ -34,8 +35,9 @@ * @param key * @param channel */ - public static void addClientChannel(int key, ChannelId channel) { - CLIENT_ID_MAP.put(key, channel); + public static void addClientChannel(int key, Channel channel) { + CLIENT_ID_MAP.put(key, channel.id()); + CLIENT_GROUP.add(channel); } @@ -47,6 +49,9 @@ */ public static Channel getClientChannel(int key) { ChannelId channelId = CLIENT_ID_MAP.get(key); + if (null == channelId) { + return null; + } Channel channel = CLIENT_GROUP.find(channelId); return channel; } @@ -58,8 +63,9 @@ * @param key * @param channel */ - public static void addServerChannel(int key, ChannelId channel) { - SERVER_ID_MAP.put(key, channel); + public static void addServerChannel(int key, Channel channel) { + SERVER_ID_MAP.put(key, channel.id()); + SERVER_GROUP.add(channel); } @@ -71,6 +77,9 @@ */ public static Channel getServerChannel(int key) { ChannelId channelId = SERVER_ID_MAP.get(key); + if (null == channelId) { + return null; + } Channel channel = SERVER_GROUP.find(channelId); return channel; } @@ -102,7 +111,7 @@ * @param key * @return */ - public static int getTimes(int key) { + public static Integer getTimes(int key) { return TIMES.get(key); } -- Gitblit v1.7.1