package com.ruoyi.web.controller.webSocket; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.time.LocalDateTime; /** * TextWebSocketFrame 表示一个文本桢 * 使用@Sharable注解使其可以被多个连接共享 * 支持基于AppUser的连接管理 */ @Component @Slf4j @ChannelHandler.Sharable public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler { @Autowired private WebSocketUserConnectionManager userConnectionManager; @Autowired private WebSocketAuthHandler authHandler; @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String message = msg.text(); log.info("[服务器] : 收到消息 -> {}", message); try { // 尝试解析消息,检查是否包含用户认证信息 JSONObject messageObj = JSON.parseObject(message); String messageType = messageObj.getString("type"); if ("auth".equals(messageType)) { // 处理用户认证消息 handleUserAuth(ctx, messageObj); } else if ("ping".equals(messageType)) { // 处理心跳消息 handlePing(ctx, messageObj); } else { // 处理普通业务消息 handleBusinessMessage(ctx, messageObj); } } catch (Exception e) { // 如果不是JSON格式,按普通文本处理 log.debug("消息不是JSON格式,按普通文本处理: {}", message); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->" + message)); } } /** * 处理用户认证消息 */ private void handleUserAuth(ChannelHandlerContext ctx, JSONObject messageObj) { try { // 提取认证信息 WebSocketAuthHandler.UserAuthInfo authInfo = authHandler.extractAuthInfo(messageObj); String appUserId = authInfo.getAppUserId(); String token = authInfo.getToken(); if (!StringUtils.hasText(appUserId)) { sendAuthFailureResponse(ctx, "用户ID不能为空"); return; } // 进行用户认证 WebSocketAuthHandler.AuthResult authResult = authHandler.authenticateUser(appUserId, token); if (authResult.isSuccess()) { // 认证成功,将连接与用户关联 userConnectionManager.addUserConnection(appUserId, ctx.channel()); // 发送认证成功响应 JSONObject response = new JSONObject(); response.put("type", "auth_success"); response.put("appUserId", appUserId); response.put("nickName", authResult.getAppUser().getNickName()); response.put("message", "认证成功"); response.put("timestamp", LocalDateTime.now().toString()); ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); log.info("用户 {} ({}) 认证成功,ChannelId: {}", appUserId, authResult.getAppUser().getNickName(), ctx.channel().id().asLongText()); } else { // 认证失败 sendAuthFailureResponse(ctx, authResult.getMessage()); log.warn("用户 {} 认证失败: {}, ChannelId: {}", appUserId, authResult.getMessage(), ctx.channel().id().asLongText()); } } catch (Exception e) { log.error("处理用户认证时发生错误", e); sendAuthFailureResponse(ctx, "认证过程中发生错误: " + e.getMessage()); } } /** * 发送认证失败响应 */ private void sendAuthFailureResponse(ChannelHandlerContext ctx, String message) { JSONObject response = new JSONObject(); response.put("type", "auth_failed"); response.put("message", message); response.put("timestamp", LocalDateTime.now().toString()); ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); } /** * 处理心跳消息 */ private void handlePing(ChannelHandlerContext ctx, JSONObject messageObj) { String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText()); JSONObject response = new JSONObject(); response.put("type", "pong"); response.put("appUserId", appUserId); response.put("timestamp", LocalDateTime.now().toString()); ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); } /** * 处理业务消息 */ private void handleBusinessMessage(ChannelHandlerContext ctx, JSONObject messageObj) { String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText()); if (appUserId != null) { log.info("收到用户 {} 的业务消息: {}", appUserId, messageObj.toJSONString()); // 回复消息 JSONObject response = new JSONObject(); response.put("type", "message_response"); response.put("appUserId", appUserId); response.put("originalMessage", messageObj); response.put("serverTime", LocalDateTime.now().toString()); ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); } else { log.warn("收到未认证连接的业务消息: {}", messageObj.toJSONString()); // 要求先认证 JSONObject response = new JSONObject(); response.put("type", "auth_required"); response.put("message", "请先进行用户认证"); response.put("timestamp", LocalDateTime.now().toString()); ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString())); } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // id 表示唯一的值 LongText是唯一的 log.info("handlerAdded 被调用: {}", ctx.channel().id().asLongText()); // shortText 可能会重复 log.info("handlerAdded 被调用: {}", ctx.channel().id().asShortText()); // 注意:用户连接管理在认证时添加,这里不添加 } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // id 表示唯一的值 LongText是唯一的 log.info("handlerRemoved 被调用: {}", ctx.channel().id().asLongText()); // shortText 可能会重复 log.info("handlerRemoved 被调用: {}", ctx.channel().id().asShortText()); // 从用户连接管理器中移除连接 if (userConnectionManager != null) { userConnectionManager.removeUserConnection(ctx.channel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("WebSocket连接异常: {}", cause.getMessage(), cause); ctx.channel().close(); } }