From 243385177a14cdcaabc12f47e7fd5c17bfe8f56b Mon Sep 17 00:00:00 2001 From: mitao <2763622819@qq.com> Date: 星期一, 27 五月 2024 09:25:34 +0800 Subject: [PATCH] 提交websocket相关代码 --- ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/SemaphoreUtils.java | 59 +++++++++ ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketConfig.java | 20 +++ ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketServer.java | 97 ++++++++++++++++ ruoyi-api/ruoyi-api-system/src/main/java/util/WebSocketUsers.java | 140 +++++++++++++++++++++++ ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/WebSocketTestController.java | 16 ++ 5 files changed, 332 insertions(+), 0 deletions(-) diff --git a/ruoyi-api/ruoyi-api-system/src/main/java/util/WebSocketUsers.java b/ruoyi-api/ruoyi-api-system/src/main/java/util/WebSocketUsers.java new file mode 100644 index 0000000..407f505 --- /dev/null +++ b/ruoyi-api/ruoyi-api-system/src/main/java/util/WebSocketUsers.java @@ -0,0 +1,140 @@ +package util; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.websocket.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * websocket 客户端用户集 + * + * @author ruoyi + */ +public class WebSocketUsers +{ + /** + * WebSocketUsers 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class); + + /** + * 用户集 + */ + private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>(); + + /** + * 存储用户 + * + * @param key 唯一键 + * @param session 用户信息 + */ + public static void put(String key, Session session) + { + USERS.put(key, session); + } + + /** + * 移除用户 + * + * @param session 用户信息 + * + * @return 移除结果 + */ + public static boolean remove(Session session) + { + String key = null; + boolean flag = USERS.containsValue(session); + if (flag) + { + Set<Map.Entry<String, Session>> entries = USERS.entrySet(); + for (Map.Entry<String, Session> entry : entries) + { + Session value = entry.getValue(); + if (value.equals(session)) + { + key = entry.getKey(); + break; + } + } + } + else + { + return true; + } + return remove(key); + } + + /** + * 移出用户 + * + * @param key 键 + */ + public static boolean remove(String key) + { + LOGGER.info("\n 正在移出用户 - {}", key); + Session remove = USERS.remove(key); + if (remove != null) + { + boolean containsValue = USERS.containsValue(remove); + LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功"); + return containsValue; + } + else + { + return true; + } + } + + /** + * 获取在线用户列表 + * + * @return 返回用户集合 + */ + public static Map<String, Session> getUsers() + { + return USERS; + } + + /** + * 群发消息文本消息 + * + * @param message 消息内容 + */ + public static void sendMessageToUsersByText(String message) + { + Collection<Session> values = USERS.values(); + for (Session value : values) + { + sendMessageToUserByText(value, message); + } + } + + /** + * 发送文本消息 + * + * @param session 自己的用户名 + * @param message 消息内容 + */ + public static void sendMessageToUserByText(Session session, String message) + { + if (session != null) + { + try + { + session.getBasicRemote().sendText(message); + } + catch (IOException e) + { + LOGGER.error("\n[发送消息异常]", e); + } + } + else + { + LOGGER.info("\n[你已离线]"); + } + } +} diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/WebSocketTestController.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/WebSocketTestController.java new file mode 100644 index 0000000..c7cdf7f --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/controller/WebSocketTestController.java @@ -0,0 +1,16 @@ +package com.ruoyi.system.controller; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import util.WebSocketUsers; + +@RestController +@RequestMapping("/websocket") +public class WebSocketTestController { + + @GetMapping("/push") + public void push() { + WebSocketUsers.sendMessageToUsersByText("长江长江,我是黄河!"); + } +} \ No newline at end of file diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/SemaphoreUtils.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/SemaphoreUtils.java new file mode 100644 index 0000000..545621a --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/SemaphoreUtils.java @@ -0,0 +1,59 @@ +package com.ruoyi.system.websocket; + +import java.util.concurrent.Semaphore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 信号量相关处理 + * + * @author ruoyi + */ +public class SemaphoreUtils +{ + + /** + * SemaphoreUtils 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class); + + /** + * 获取信号量 + * + * @param semaphore + * @return + */ + public static boolean tryAcquire(Semaphore semaphore) + { + boolean flag = false; + + try + { + flag = semaphore.tryAcquire(); + } + catch (Exception e) + { + LOGGER.error("获取信号量异常", e); + } + + return flag; + } + + /** + * 释放信号量 + * + * @param semaphore + */ + public static void release(Semaphore semaphore) + { + + try + { + semaphore.release(); + } + catch (Exception e) + { + LOGGER.error("释放信号量异常", e); + } + } +} diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketConfig.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketConfig.java new file mode 100644 index 0000000..e7c191a --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketConfig.java @@ -0,0 +1,20 @@ +package com.ruoyi.system.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * websocket 配置 + * + * @author ruoyi + */ +@Configuration +public class WebSocketConfig +{ + @Bean + public ServerEndpointExporter serverEndpointExporter() + { + return new ServerEndpointExporter(); + } +} diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketServer.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketServer.java new file mode 100644 index 0000000..306b0b4 --- /dev/null +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/websocket/WebSocketServer.java @@ -0,0 +1,97 @@ +package com.ruoyi.system.websocket; + +import java.util.concurrent.Semaphore; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import util.WebSocketUsers; + +/** + * websocket 消息处理 + * + * @author ruoyi + */ +@Component +@ServerEndpoint("/websocket/message") +public class WebSocketServer { + + /** + * WebSocketServer 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class); + + /** + * 默认最多允许同时在线人数100 + */ + public static int socketMaxOnlineCount = 100; + + private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount); + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session) throws Exception { + boolean semaphoreFlag = false; + // 尝试获取信号量 + semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore); + if (!semaphoreFlag) { + // 未获取到信号量 + LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount); + WebSocketUsers.sendMessageToUserByText(session, + "当前在线人数超过限制数:" + socketMaxOnlineCount); + session.close(); + } else { + // 添加用户 + WebSocketUsers.put(session.getId(), session); + LOGGER.info("\n 建立连接 - {}", session); + LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size()); + WebSocketUsers.sendMessageToUserByText(session, "连接成功"); + } + } + + /** + * 连接关闭时处理 + */ + @OnClose + public void onClose(Session session) { + LOGGER.info("\n 关闭连接 - {}", session); + // 移除用户 + WebSocketUsers.remove(session.getId()); + // 获取到信号量则需释放 + SemaphoreUtils.release(socketSemaphore); + } + + /** + * 抛出异常时处理 + */ + @OnError + public void onError(Session session, Throwable exception) throws Exception { + if (session.isOpen()) { + // 关闭连接 + session.close(); + } + String sessionId = session.getId(); + LOGGER.info("\n 连接异常 - {}", sessionId); + LOGGER.info("\n 异常信息 - {}", exception); + // 移出用户 + WebSocketUsers.remove(sessionId); + // 获取到信号量则需释放 + SemaphoreUtils.release(socketSemaphore); + } + + /** + * 服务器接收到客户端消息时调用的方法 + */ + @OnMessage + public void onMessage(String message, Session session) { + String msg = message.replace("你", "我").replace("吗", ""); + WebSocketUsers.sendMessageToUserByText(session, msg); + } +} -- Gitblit v1.7.1