zhibing.pu
2024-08-21 f10ef4933c6decda69308c1cf6e4d0449856ba1f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package com.ruoyi.other.webSocket;
 
 
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 
import java.util.Hashtable;
 
public class NettyWebSocketController {
    
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
    
    
    public static Hashtable<String, String> table;
    public static int i = 0;
 
    static {
        if (table == null) {
            table = new Hashtable<>();
        }
    }
    
    /**
     * 向客户端发送消息
     *
     * @param ctx
     * @param msg
     * @author TaoNingBo
     */
    public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) {
        if (ctx != null && ctx.channel().isActive()) {
            ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
            ChannelFuture sync;
            try {
                sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
                if (!sync.isSuccess()) {
                    boolean b = true;
                    for (int i = 0; i < 10; i++) {
                        ctx.wait(3000);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        if (sync.isSuccess()) {
                            b = false;
                            break;
                        }
                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                    }
                    if (b) {
                        NettyChannelMap.remove(ctx);
                        return R.fail("无效的消息通道");
                    }
                }
                return R.ok();
            } catch (Exception e) {
                NettyChannelMap.remove(ctx);
                e.printStackTrace();
                return R.fail("发送消息失败:" + e.getMessage());
            }
        } else {
            NettyChannelMap.remove(ctx);
            return R.fail("无效的消息通道");
        }
    }
    
    //    **链接断开 将推送消息记录
    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
        ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
        if (ctx != null) {
            ChannelFuture sync;
            try {
                sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                if (!sync.isSuccess()) {
                    for (int i = 0; i < 10; i++) {
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        ;
                        if (!sync.isSuccess()) {
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                            ;
                            System.err.println("推送不成功,将继续推送" + msg);
                            if (i == 9) {
                                table.put(cacheType + id, msg);
                                ctx.close();
                                System.err.println("推送发生异常,记录:" + msg);
                            }
                        } else {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                table.put(cacheType + id, msg);
                System.err.println("推送发生异常,记录:" + msg);
            }
        } else {
            table.put(cacheType + id, msg);
            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
        }
    }
    
    /**
     * 判断客户端要执行什么操作
     *
     * @param ctx
     * @param msg
     * @author TaoNingBo
     */
    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
        try {
            // 验证即时通讯命令是否正确有效
            if (StringUtils.isEmpty(msg)) {
                return;
            }
            String msgStr = msg.toString();
            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
                return;
            }
            
            // 获取socket信息,保存相应的socket
            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
            int code = jsonMsg.getIntValue("code");
            String message = jsonMsg.getString("msg");
            String method = jsonMsg.getString("method");
            if (code != 200 || !message.equals("SUCCESS")) {
                return;
            }
            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
            
            if (null != ctx && ctx.channel().isActive()) {
                jsonMsg.put("method", Method.pong);
                sendMsgToClient(ctx, jsonMsg.toJSONString());
            }
            
            
            // ############################### 心跳  ############################
            // 心跳
            if (method.equals(Method.ping)) {
                String userId1 = jsonCon.getString("userId");
                if (StringUtils.isNotEmpty(userId1)) {
                    //存储业务使用的通道
                    if (null != ctx && ctx.channel().isActive()) {
                        NettyChannelMap.update("Applets" + userId1, ctx);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}