package com.ruoyi.web.controller.webSocket;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.StringUtils;
|
|
import java.time.LocalDateTime;
|
|
/**
|
* TextWebSocketFrame 表示一个文本桢
|
* 使用@Sharable注解使其可以被多个连接共享
|
* 支持基于AppUser的连接管理
|
*/
|
@Component
|
@Slf4j
|
@ChannelHandler.Sharable
|
public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
|
|
@Autowired
|
private WebSocketUserConnectionManager userConnectionManager;
|
|
@Autowired
|
private WebSocketAuthHandler authHandler;
|
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
String message = msg.text();
|
log.info("[服务器] : 收到消息 -> {}", message);
|
|
try {
|
// 尝试解析消息,检查是否包含用户认证信息
|
JSONObject messageObj = JSON.parseObject(message);
|
String messageType = messageObj.getString("type");
|
|
if ("auth".equals(messageType)) {
|
// 处理用户认证消息
|
handleUserAuth(ctx, messageObj);
|
} else if ("ping".equals(messageType)) {
|
// 处理心跳消息
|
handlePing(ctx, messageObj);
|
} else {
|
// 处理普通业务消息
|
handleBusinessMessage(ctx, messageObj);
|
}
|
} catch (Exception e) {
|
// 如果不是JSON格式,按普通文本处理
|
log.debug("消息不是JSON格式,按普通文本处理: {}", message);
|
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->" + message));
|
}
|
}
|
|
/**
|
* 处理用户认证消息
|
*/
|
private void handleUserAuth(ChannelHandlerContext ctx, JSONObject messageObj) {
|
try {
|
// 提取认证信息
|
WebSocketAuthHandler.UserAuthInfo authInfo = authHandler.extractAuthInfo(messageObj);
|
String appUserId = authInfo.getAppUserId();
|
String token = authInfo.getToken();
|
|
if (!StringUtils.hasText(appUserId)) {
|
sendAuthFailureResponse(ctx, "用户ID不能为空");
|
return;
|
}
|
|
// 进行用户认证
|
WebSocketAuthHandler.AuthResult authResult = authHandler.authenticateUser(appUserId, token);
|
|
if (authResult.isSuccess()) {
|
// 认证成功,将连接与用户关联
|
userConnectionManager.addUserConnection(appUserId, ctx.channel());
|
|
// 发送认证成功响应
|
JSONObject response = new JSONObject();
|
response.put("type", "auth_success");
|
response.put("appUserId", appUserId);
|
response.put("nickName", authResult.getAppUser().getNickName());
|
response.put("message", "认证成功");
|
response.put("timestamp", LocalDateTime.now().toString());
|
|
ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
|
log.info("用户 {} ({}) 认证成功,ChannelId: {}",
|
appUserId, authResult.getAppUser().getNickName(), ctx.channel().id().asLongText());
|
} else {
|
// 认证失败
|
sendAuthFailureResponse(ctx, authResult.getMessage());
|
log.warn("用户 {} 认证失败: {}, ChannelId: {}",
|
appUserId, authResult.getMessage(), ctx.channel().id().asLongText());
|
}
|
} catch (Exception e) {
|
log.error("处理用户认证时发生错误", e);
|
sendAuthFailureResponse(ctx, "认证过程中发生错误: " + e.getMessage());
|
}
|
}
|
|
/**
|
* 发送认证失败响应
|
*/
|
private void sendAuthFailureResponse(ChannelHandlerContext ctx, String message) {
|
JSONObject response = new JSONObject();
|
response.put("type", "auth_failed");
|
response.put("message", message);
|
response.put("timestamp", LocalDateTime.now().toString());
|
|
ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
|
}
|
|
/**
|
* 处理心跳消息
|
*/
|
private void handlePing(ChannelHandlerContext ctx, JSONObject messageObj) {
|
String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
|
|
JSONObject response = new JSONObject();
|
response.put("type", "pong");
|
response.put("appUserId", appUserId);
|
response.put("timestamp", LocalDateTime.now().toString());
|
|
ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
|
}
|
|
/**
|
* 处理业务消息
|
*/
|
private void handleBusinessMessage(ChannelHandlerContext ctx, JSONObject messageObj) {
|
String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
|
|
if (appUserId != null) {
|
log.info("收到用户 {} 的业务消息: {}", appUserId, messageObj.toJSONString());
|
|
// 回复消息
|
JSONObject response = new JSONObject();
|
response.put("type", "message_response");
|
response.put("appUserId", appUserId);
|
response.put("originalMessage", messageObj);
|
response.put("serverTime", LocalDateTime.now().toString());
|
|
ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
|
} else {
|
log.warn("收到未认证连接的业务消息: {}", messageObj.toJSONString());
|
|
// 要求先认证
|
JSONObject response = new JSONObject();
|
response.put("type", "auth_required");
|
response.put("message", "请先进行用户认证");
|
response.put("timestamp", LocalDateTime.now().toString());
|
|
ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
|
}
|
}
|
|
@Override
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
// id 表示唯一的值 LongText是唯一的
|
log.info("handlerAdded 被调用: {}", ctx.channel().id().asLongText());
|
// shortText 可能会重复
|
log.info("handlerAdded 被调用: {}", ctx.channel().id().asShortText());
|
|
// 注意:用户连接管理在认证时添加,这里不添加
|
}
|
|
@Override
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
// id 表示唯一的值 LongText是唯一的
|
log.info("handlerRemoved 被调用: {}", ctx.channel().id().asLongText());
|
// shortText 可能会重复
|
log.info("handlerRemoved 被调用: {}", ctx.channel().id().asShortText());
|
|
// 从用户连接管理器中移除连接
|
if (userConnectionManager != null) {
|
userConnectionManager.removeUserConnection(ctx.channel());
|
}
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
log.error("WebSocket连接异常: {}", cause.getMessage(), cause);
|
ctx.channel().close();
|
}
|
}
|