package cn.mb.cloud.gateway.util.applets; import cn.mb.cloud.gateway.util.RedisUtil; import cn.mb.cloud.gateway.util.SinataUtil; import cn.mb.cloud.gateway.util.SpringUtil; import cn.mb.cloud.gateway.util.StringUtil; import cn.mb.cloud.gateway.util.echo.Method; import cn.mb.cloud.gateway.util.echo.NettyChannelMap; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; public class NettyWebSocketController { public static Hashtable> map = new Hashtable>(); private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); public static Hashtable table; static { if (table == null) { table = new Hashtable<>(); } } public static boolean isdebug = false; public static int i = 0; /** * 判断客户端要执行什么操作 * * @param ctx * @param msg * @author TaoNingBo */ public void JudgeOperation(ChannelHandlerContext ctx, String msg) { try { // 验证即时通讯命令是否正确有效 if (SinataUtil.isEmpty(msg)) { return; } String msgStr = msg.toString(); if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { return; } if (isdebug) { // System.out.println("<<<--receive-->>>111" + msg); } // 获取socket信息,保存相应的socket JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); int code = jsonMsg.getIntValue("code"); String message = jsonMsg.getString("msg"); String method = jsonMsg.getString("method"); if (code != 200 || !message.equals("SUCCESS")) { return; } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); if (null != ctx && ctx.channel().isActive()) { jsonMsg.put("method", Method.pong); sendMsgToClient(ctx, jsonMsg.toJSONString()); } // ############################### 心跳 ############################ // 心跳 if (method.equals(Method.ping)) { String token = jsonCon.getString("token"); String userId1 = jsonCon.getString("userId"); if (StringUtil.isNotEmpty(userId1)) { //确保账号在单个设备上登录 if (StringUtil.isNotEmpty(token)) { NettyChannelMap.update_(token.substring(0, 23), ctx);//存储单点登录的通道 String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据 if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(ctx, msg_.toJSONString()); TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove_(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); } if (StringUtil.isEmpty(token_)) {//确保登录的时候存储token失败的情况 redisUtil.setStrValue("USER_Applets_" + userId1, token); } } //存储业务使用的通道 if (null != ctx && ctx.channel().isActive()) { NettyChannelMap.update("Applets" + userId1, ctx); // String s = NettyMsg.setMsg(Method.ok, new HashMap()); // ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } } } } catch (Exception e) { if (isdebug) { NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } } /** * 向客户端发送消息 * * @param ctx * @param msg * @author TaoNingBo */ public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { // System.out.println(ctx.channel().isActive()); if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; try { sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); if (!sync.isSuccess()) { boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if (sync.isSuccess()) { b = false; break; } System.err.println("小程序-》推送不成功,将继续推送" + msg); } if (b) { NettyChannelMap.remove(ctx); } } } catch (Exception e) { System.err.println("小程序-》推送发生异常,记录:" + msg); NettyChannelMap.remove(ctx); } if (isdebug) { System.err.println("小程序-》 <<<--send-->>>" + msg); } } else { System.err.println("小程序-》推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } } // **链接断开 将推送消息记录 public static void sendMsgToClient(String cacheType, Integer id, String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ChannelFuture sync; try { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if (!sync.isSuccess()) { for (int i = 0; i < 10; i++) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; if (!sync.isSuccess()) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; System.err.println("推送不成功,将继续推送" + msg); if (i == 9) { table.put(cacheType + id, msg); ctx.close(); System.err.println("推送发生异常,记录:" + msg); } } else { break; } } } } catch (Exception e) { table.put(cacheType + id, msg); System.err.println("推送发生异常,记录:" + msg); } if (isdebug) { System.err.println("<<<--send-->>>" + msg); } } else { table.put(cacheType + id, msg); System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); } } }