package com.ruoyi.other.webSocket; import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.core.utils.StringUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Hashtable; public class NettyWebSocketController { public static Hashtable> map = new Hashtable>(); public static Hashtable table; public static int i = 0; static { if (table == null) { table = new Hashtable<>(); } } /** * 向客户端发送消息 * * @param ctx * @param msg * @author TaoNingBo */ public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; try { sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); if (!sync.isSuccess()) { boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if (sync.isSuccess()) { b = false; break; } System.err.println("小程序-》推送不成功,将继续推送" + msg); } if (b) { NettyChannelMap.remove(ctx); return R.fail("无效的消息通道"); } } return R.ok(); } catch (Exception e) { NettyChannelMap.remove(ctx); e.printStackTrace(); return R.fail("发送消息失败:" + e.getMessage()); } } else { NettyChannelMap.remove(ctx); return R.fail("无效的消息通道"); } } // **链接断开 将推送消息记录 public static void sendMsgToClient(String cacheType, Integer id, String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ChannelFuture sync; try { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if (!sync.isSuccess()) { for (int i = 0; i < 10; i++) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; if (!sync.isSuccess()) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; System.err.println("推送不成功,将继续推送" + msg); if (i == 9) { table.put(cacheType + id, msg); ctx.close(); System.err.println("推送发生异常,记录:" + msg); } } else { break; } } } } catch (Exception e) { table.put(cacheType + id, msg); System.err.println("推送发生异常,记录:" + msg); } } else { table.put(cacheType + id, msg); System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); } } /** * 判断客户端要执行什么操作 * * @param ctx * @param msg * @author TaoNingBo */ public void JudgeOperation(ChannelHandlerContext ctx, String msg) { try { // 验证即时通讯命令是否正确有效 if (StringUtils.isEmpty(msg)) { return; } String msgStr = msg.toString(); if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { return; } // 获取socket信息,保存相应的socket JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); int code = jsonMsg.getIntValue("code"); String message = jsonMsg.getString("msg"); String method = jsonMsg.getString("method"); if (code != 200 || !message.equals("SUCCESS")) { return; } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); if (null != ctx && ctx.channel().isActive()) { jsonMsg.put("method", Method.pong); sendMsgToClient(ctx, jsonMsg.toJSONString()); } // ############################### 心跳 ############################ // 心跳 if (method.equals(Method.ping)) { String userId1 = jsonCon.getString("userId"); if (StringUtils.isNotEmpty(userId1)) { //存储业务使用的通道 if (null != ctx && ctx.channel().isActive()) { NettyChannelMap.update("Applets" + userId1, ctx); } } } } catch (Exception e) { e.printStackTrace(); } } }