package com.ruoyi.web.controller.webSocket; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * 基于AppUser的WebSocket连接管理器 * 支持按用户ID管理连接,一个用户可以有多个设备连接 */ @Component @Slf4j public class WebSocketUserConnectionManager { /** * 存储用户的所有连接 * Key: 用户ID (appUserId) * Value: 该用户的所有连接Channel列表 */ private static final ConcurrentHashMap> userConnections = new ConcurrentHashMap<>(); /** * 存储连接与用户的映射关系 * Key: ChannelId.asLongText() * Value: 用户ID (appUserId) */ private static final ConcurrentHashMap channelToUser = new ConcurrentHashMap<>(); /** * 存储所有连接 * Key: ChannelId.asLongText() * Value: Channel */ private static final ConcurrentHashMap allConnections = new ConcurrentHashMap<>(); /** * 添加用户连接 * @param appUserId 用户ID * @param channel WebSocket连接通道 */ public void addUserConnection(String appUserId, Channel channel) { String channelId = channel.id().asLongText(); // 添加到用户连接映射 userConnections.computeIfAbsent(appUserId, k -> ConcurrentHashMap.newKeySet()) .add(channel); // 添加到连接用户映射 channelToUser.put(channelId, appUserId); // 添加到所有连接 allConnections.put(channelId, channel); log.info("用户连接已添加,用户ID: {}, ChannelId: {}, 该用户连接数: {}", appUserId, channelId, userConnections.get(appUserId).size()); } /** * 移除用户连接 * @param channel WebSocket连接通道 */ public void removeUserConnection(Channel channel) { String channelId = channel.id().asLongText(); String appUserId = channelToUser.get(channelId); if (appUserId != null) { // 从用户连接中移除 Set userChannels = userConnections.get(appUserId); if (userChannels != null) { userChannels.remove(channel); // 如果用户没有连接了,删除用户映射 if (userChannels.isEmpty()) { userConnections.remove(appUserId); } } // 从连接用户映射中移除 channelToUser.remove(channelId); log.info("用户连接已移除,用户ID: {}, ChannelId: {}, 该用户剩余连接数: {}", appUserId, channelId, userChannels != null ? userChannels.size() : 0); } // 从所有连接中移除 allConnections.remove(channelId); } /** * 向指定用户的所有设备发送消息 * @param appUserId 用户ID * @param message 消息内容 * @return 成功发送的设备数 */ public int sendMessageToUser(String appUserId, String message) { Set userChannels = userConnections.get(appUserId); if (userChannels == null || userChannels.isEmpty()) { log.warn("用户 {} 没有活跃连接", appUserId); return 0; } int successCount = 0; for (Channel channel : userChannels) { if (channel != null && channel.isActive()) { try { channel.writeAndFlush(new TextWebSocketFrame(message)); successCount++; } catch (Exception e) { log.error("向用户 {} 发送消息失败,连接ID: {}, 错误: {}", appUserId, channel.id().asLongText(), e.getMessage()); } } } log.info("向用户 {} 发送消息完成,成功发送到 {} 个设备,消息内容: {}", appUserId, successCount, message); return successCount; } /** * 向指定连接发送消息 * @param channelId 连接ID * @param message 消息内容 * @return 是否发送成功 */ public boolean sendMessageToChannel(String channelId, String message) { Channel channel = allConnections.get(channelId); if (channel != null && channel.isActive()) { try { channel.writeAndFlush(new TextWebSocketFrame(message)); log.info("消息已发送到连接: {}, 内容: {}", channelId, message); return true; } catch (Exception e) { log.error("发送消息失败,连接ID: {}, 错误: {}", channelId, e.getMessage()); return false; } } else { log.warn("连接不存在或已关闭,ChannelId: {}", channelId); return false; } } /** * 向所有连接广播消息 * @param message 消息内容 * @return 成功发送的连接数 */ public int broadcastMessage(String message) { int successCount = 0; for (Channel channel : allConnections.values()) { if (channel != null && channel.isActive()) { try { channel.writeAndFlush(new TextWebSocketFrame(message)); successCount++; } catch (Exception e) { log.error("广播消息失败,连接ID: {}, 错误: {}", channel.id().asLongText(), e.getMessage()); } } } log.info("广播消息完成,成功发送到 {} 个连接,消息内容: {}", successCount, message); return successCount; } /** * 向多个用户发送消息 * @param appUserIds 用户ID列表 * @param message 消息内容 * @return 成功发送的用户数 */ public int sendMessageToUsers(List appUserIds, String message) { int successUserCount = 0; for (String appUserId : appUserIds) { int deviceCount = sendMessageToUser(appUserId, message); if (deviceCount > 0) { successUserCount++; } } log.info("向 {} 个用户发送消息完成,成功发送到 {} 个用户", appUserIds.size(), successUserCount); return successUserCount; } /** * 获取用户的所有连接 * @param appUserId 用户ID * @return 连接列表 */ public Set getUserConnections(String appUserId) { return userConnections.getOrDefault(appUserId, Collections.emptySet()); } /** * 获取用户连接数 * @param appUserId 用户ID * @return 连接数 */ public int getUserConnectionCount(String appUserId) { Set userChannels = userConnections.get(appUserId); return userChannels != null ? userChannels.size() : 0; } /** * 获取总连接数 * @return 总连接数 */ public int getTotalConnectionCount() { return allConnections.size(); } /** * 获取在线用户数 * @return 在线用户数 */ public int getOnlineUserCount() { return userConnections.size(); } /** * 获取所有在线用户ID * @return 用户ID集合 */ public Set getOnlineUserIds() { return userConnections.keySet(); } /** * 检查用户是否在线 * @param appUserId 用户ID * @return 是否在线 */ public boolean isUserOnline(String appUserId) { Set userChannels = userConnections.get(appUserId); if (userChannels == null || userChannels.isEmpty()) { return false; } // 检查是否有活跃连接 return userChannels.stream().anyMatch(Channel::isActive); } /** * 检查连接是否存在 * @param channelId 连接ID * @return 是否存在 */ public boolean isConnectionExists(String channelId) { Channel channel = allConnections.get(channelId); return channel != null && channel.isActive(); } /** * 根据连接ID获取用户ID * @param channelId 连接ID * @return 用户ID */ public String getUserIdByChannelId(String channelId) { return channelToUser.get(channelId); } /** * 获取连接统计信息 * @return 统计信息 */ public Map getConnectionStats() { Map stats = new HashMap<>(); stats.put("totalConnections", getTotalConnectionCount()); stats.put("onlineUsers", getOnlineUserCount()); stats.put("onlineUserIds", getOnlineUserIds()); // 统计每个用户的连接数 Map userConnectionCounts = new HashMap<>(); for (Map.Entry> entry : userConnections.entrySet()) { userConnectionCounts.put(entry.getKey(), entry.getValue().size()); } stats.put("userConnectionCounts", userConnectionCounts); return stats; } }