xuhy
1 天以前 a960c432d78dfe5f0ef07295d0210ddb09340e12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package com.ruoyi.web.controller.webSocket;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
 
import java.time.LocalDateTime;
 
/**
 * TextWebSocketFrame 表示一个文本桢
 * 使用@Sharable注解使其可以被多个连接共享
 * 支持基于AppUser的连接管理
 */
@Component
@Slf4j
@ChannelHandler.Sharable
public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 
 
    @Autowired
    private WebSocketUserConnectionManager userConnectionManager;
    
    @Autowired
    private WebSocketAuthHandler authHandler;
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String message = msg.text();
        log.info("[服务器] : 收到消息 -> {}", message);
        
        try {
            // 尝试解析消息,检查是否包含用户认证信息
            JSONObject messageObj = JSON.parseObject(message);
            String messageType = messageObj.getString("type");
            
            if ("auth".equals(messageType)) {
                // 处理用户认证消息
                handleUserAuth(ctx, messageObj);
            } else if ("ping".equals(messageType)) {
                // 处理心跳消息
                handlePing(ctx, messageObj);
            } else {
                // 处理普通业务消息
                handleBusinessMessage(ctx, messageObj);
            }
        } catch (Exception e) {
            // 如果不是JSON格式,按普通文本处理
            log.debug("消息不是JSON格式,按普通文本处理: {}", message);
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->" + message));
        }
    }
    
    /**
     * 处理用户认证消息
     */
    private void handleUserAuth(ChannelHandlerContext ctx, JSONObject messageObj) {
        try {
            // 提取认证信息
            WebSocketAuthHandler.UserAuthInfo authInfo = authHandler.extractAuthInfo(messageObj);
            String appUserId = authInfo.getAppUserId();
            String token = authInfo.getToken();
            
            if (!StringUtils.hasText(appUserId)) {
                sendAuthFailureResponse(ctx, "用户ID不能为空");
                return;
            }
            
            // 进行用户认证
            WebSocketAuthHandler.AuthResult authResult = authHandler.authenticateUser(appUserId, token);
            
            if (authResult.isSuccess()) {
                // 认证成功,将连接与用户关联
                userConnectionManager.addUserConnection(appUserId, ctx.channel());
                
                // 发送认证成功响应
                JSONObject response = new JSONObject();
                response.put("type", "auth_success");
                response.put("appUserId", appUserId);
                response.put("nickName", authResult.getAppUser().getNickName());
                response.put("message", "认证成功");
                response.put("timestamp", LocalDateTime.now().toString());
                
                ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
                log.info("用户 {} ({}) 认证成功,ChannelId: {}", 
                        appUserId, authResult.getAppUser().getNickName(), ctx.channel().id().asLongText());
            } else {
                // 认证失败
                sendAuthFailureResponse(ctx, authResult.getMessage());
                log.warn("用户 {} 认证失败: {}, ChannelId: {}", 
                        appUserId, authResult.getMessage(), ctx.channel().id().asLongText());
            }
        } catch (Exception e) {
            log.error("处理用户认证时发生错误", e);
            sendAuthFailureResponse(ctx, "认证过程中发生错误: " + e.getMessage());
        }
    }
    
    /**
     * 发送认证失败响应
     */
    private void sendAuthFailureResponse(ChannelHandlerContext ctx, String message) {
        JSONObject response = new JSONObject();
        response.put("type", "auth_failed");
        response.put("message", message);
        response.put("timestamp", LocalDateTime.now().toString());
        
        ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
    }
    
    /**
     * 处理心跳消息
     */
    private void handlePing(ChannelHandlerContext ctx, JSONObject messageObj) {
        String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
        
        JSONObject response = new JSONObject();
        response.put("type", "pong");
        response.put("appUserId", appUserId);
        response.put("timestamp", LocalDateTime.now().toString());
        
        ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
    }
    
    /**
     * 处理业务消息
     */
    private void handleBusinessMessage(ChannelHandlerContext ctx, JSONObject messageObj) {
        String appUserId = userConnectionManager.getUserIdByChannelId(ctx.channel().id().asLongText());
        
        if (appUserId != null) {
            log.info("收到用户 {} 的业务消息: {}", appUserId, messageObj.toJSONString());
            
            // 回复消息
            JSONObject response = new JSONObject();
            response.put("type", "message_response");
            response.put("appUserId", appUserId);
            response.put("originalMessage", messageObj);
            response.put("serverTime", LocalDateTime.now().toString());
            
            ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
        } else {
            log.warn("收到未认证连接的业务消息: {}", messageObj.toJSONString());
            
            // 要求先认证
            JSONObject response = new JSONObject();
            response.put("type", "auth_required");
            response.put("message", "请先进行用户认证");
            response.put("timestamp", LocalDateTime.now().toString());
            
            ctx.channel().writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
        }
    }
 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // id 表示唯一的值 LongText是唯一的
        log.info("handlerAdded 被调用: {}", ctx.channel().id().asLongText());
        // shortText 可能会重复
        log.info("handlerAdded 被调用: {}", ctx.channel().id().asShortText());
 
        // 注意:用户连接管理在认证时添加,这里不添加
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // id 表示唯一的值 LongText是唯一的
        log.info("handlerRemoved 被调用: {}", ctx.channel().id().asLongText());
        // shortText 可能会重复
        log.info("handlerRemoved 被调用: {}", ctx.channel().id().asShortText());
        
        // 从用户连接管理器中移除连接
        if (userConnectionManager != null) {
            userConnectionManager.removeUserConnection(ctx.channel());
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("WebSocket连接异常: {}", cause.getMessage(), cause);
        ctx.channel().close();
    }
}