| package com.ruoyi.other.webSocket; | 
|   | 
|   | 
| import com.alibaba.fastjson.JSONObject; | 
| import com.ruoyi.common.core.domain.R; | 
| import com.ruoyi.common.core.utils.StringUtils; | 
| import io.netty.buffer.ByteBuf; | 
| import io.netty.buffer.Unpooled; | 
| import io.netty.channel.ChannelFuture; | 
| import io.netty.channel.ChannelHandlerContext; | 
| import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; | 
|   | 
| import java.util.Hashtable; | 
|   | 
| public class NettyWebSocketController { | 
|      | 
|     public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); | 
|      | 
|      | 
|     public static Hashtable<String, String> table; | 
|     public static int i = 0; | 
|   | 
|     static { | 
|         if (table == null) { | 
|             table = new Hashtable<>(); | 
|         } | 
|     } | 
|      | 
|     /** | 
|      * 向客户端发送消息 | 
|      * | 
|      * @param ctx | 
|      * @param msg | 
|      * @author TaoNingBo | 
|      */ | 
|     public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) { | 
|         if (ctx != null && ctx.channel().isActive()) { | 
|             ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); | 
|             ChannelFuture sync; | 
|             try { | 
|                 sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); | 
|                 if (!sync.isSuccess()) { | 
|                     boolean b = true; | 
|                     for (int i = 0; i < 10; i++) { | 
|                         ctx.wait(3000); | 
|                         sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); | 
|                         if (sync.isSuccess()) { | 
|                             b = false; | 
|                             break; | 
|                         } | 
|                         System.err.println("小程序-》推送不成功,将继续推送" + msg); | 
|                     } | 
|                     if (b) { | 
|                         NettyChannelMap.remove(ctx); | 
|                         return R.fail("无效的消息通道"); | 
|                     } | 
|                 } | 
|                 return R.ok(); | 
|             } catch (Exception e) { | 
|                 NettyChannelMap.remove(ctx); | 
|                 e.printStackTrace(); | 
|                 return R.fail("发送消息失败:" + e.getMessage()); | 
|             } | 
|         } else { | 
|             NettyChannelMap.remove(ctx); | 
|             return R.fail("无效的消息通道"); | 
|         } | 
|     } | 
|      | 
|     //    **链接断开 将推送消息记录 | 
|     public static void sendMsgToClient(String cacheType, Integer id, String msg) { | 
|         ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); | 
|         if (ctx != null) { | 
|             ChannelFuture sync; | 
|             try { | 
|                 sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); | 
|                 if (!sync.isSuccess()) { | 
|                     for (int i = 0; i < 10; i++) { | 
|                         sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); | 
|                         ; | 
|                         if (!sync.isSuccess()) { | 
|                             sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); | 
|                             ; | 
|                             System.err.println("推送不成功,将继续推送" + msg); | 
|                             if (i == 9) { | 
|                                 table.put(cacheType + id, msg); | 
|                                 ctx.close(); | 
|                                 System.err.println("推送发生异常,记录:" + msg); | 
|                             } | 
|                         } else { | 
|                             break; | 
|                         } | 
|                     } | 
|                 } | 
|             } catch (Exception e) { | 
|                 table.put(cacheType + id, msg); | 
|                 System.err.println("推送发生异常,记录:" + msg); | 
|             } | 
|         } else { | 
|             table.put(cacheType + id, msg); | 
|             System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); | 
|         } | 
|     } | 
|      | 
|     /** | 
|      * 判断客户端要执行什么操作 | 
|      * | 
|      * @param ctx | 
|      * @param msg | 
|      * @author TaoNingBo | 
|      */ | 
|     public void JudgeOperation(ChannelHandlerContext ctx, String msg) { | 
|         try { | 
|             // 验证即时通讯命令是否正确有效 | 
|             if (StringUtils.isEmpty(msg)) { | 
|                 return; | 
|             } | 
|             String msgStr = msg.toString(); | 
|             if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { | 
|                 return; | 
|             } | 
|              | 
|             // 获取socket信息,保存相应的socket | 
|             JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); | 
|             int code = jsonMsg.getIntValue("code"); | 
|             String message = jsonMsg.getString("msg"); | 
|             String method = jsonMsg.getString("method"); | 
|             if (code != 200 || !message.equals("SUCCESS")) { | 
|                 return; | 
|             } | 
|             JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); | 
|              | 
|             if (null != ctx && ctx.channel().isActive()) { | 
|                 jsonMsg.put("method", Method.pong); | 
|                 sendMsgToClient(ctx, jsonMsg.toJSONString()); | 
|             } | 
|              | 
|              | 
|             // ############################### 心跳  ############################ | 
|             // 心跳 | 
|             if (method.equals(Method.ping)) { | 
|                 String userId1 = jsonCon.getString("userId"); | 
|                 if (StringUtils.isNotEmpty(userId1)) { | 
|                     //存储业务使用的通道 | 
|                     if (null != ctx && ctx.channel().isActive()) { | 
|                         NettyChannelMap.update("Applets" + userId1, ctx); | 
|                     } | 
|                 } | 
|             } | 
|         } catch (Exception e) { | 
|             e.printStackTrace(); | 
|         } | 
|     } | 
| } |