package com.sinata.zuul.util.applets;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.sinata.zuul.util.RedisUtil;
|
import com.sinata.zuul.util.SinataUtil;
|
import com.sinata.zuul.util.SpringUtil;
|
import com.sinata.zuul.util.StringUtil;
|
import com.sinata.zuul.util.echo.Method;
|
import com.sinata.zuul.util.echo.NettyChannelMap;
|
import com.sinata.zuul.util.echo.NettyMsg;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
|
import java.util.HashMap;
|
import java.util.Hashtable;
|
import java.util.Timer;
|
import java.util.TimerTask;
|
|
public class NettyWebSocketController {
|
|
public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
|
|
private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
|
|
public static Hashtable<String, String> table;
|
|
static {
|
if (table == null) {
|
table = new Hashtable<>();
|
}
|
}
|
|
public static boolean isdebug = false;
|
public static int i = 0;
|
|
|
/**
|
* 判断客户端要执行什么操作
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
|
try {
|
// 验证即时通讯命令是否正确有效
|
if (SinataUtil.isEmpty(msg)) {
|
return;
|
}
|
String msgStr = msg.toString();
|
if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
|
return;
|
}
|
if (isdebug) {
|
// System.out.println("<<<--receive-->>>111" + msg);
|
}
|
|
// 获取socket信息,保存相应的socket
|
JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
|
int code = jsonMsg.getIntValue("code");
|
String message = jsonMsg.getString("msg");
|
String method = jsonMsg.getString("method");
|
if (code != 200 || !message.equals("SUCCESS")) {
|
return;
|
}
|
JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
|
|
if (null != ctx && ctx.channel().isActive()) {
|
jsonMsg.put("method", Method.pong);
|
sendMsgToClient(ctx, jsonMsg.toJSONString());
|
}
|
|
|
// ############################### 心跳 ############################
|
// 心跳
|
if (method.equals(Method.ping)) {
|
String token = jsonCon.getString("token");
|
String userId1 = jsonCon.getString("userId");
|
if (StringUtil.isNotEmpty(userId1)) {
|
//确保账号在单个设备上登录
|
if (StringUtil.isNotEmpty(token)) {
|
String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据
|
if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
|
ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(0, 23));
|
|
JSONObject msg_ = new JSONObject();
|
msg_.put("code", 200);
|
msg_.put("msg", "SUCCESS");
|
msg_.put("method", "OFFLINE");
|
msg_.put("data", new Object());
|
this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
|
TimerTask timerTask = new TimerTask() {
|
@Override
|
public void run() {
|
NettyChannelMap.remove_(data_);
|
NettyChannelMap.remove(data_);
|
}
|
};
|
Timer timer = new Timer();
|
timer.schedule(timerTask, 3000);
|
timer.cancel();
|
}
|
|
NettyChannelMap.update_(token.substring(0, 23), ctx);
|
NettyChannelMap.update("Applets" + userId1, ctx);
|
String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
|
if (StringUtil.isEmpty(token_)) {//确保登录的时候存储token失败的情况
|
redisUtil.setStrValue("USER_Applets_" + userId1, token);
|
}
|
}
|
|
//存储业务使用的通道
|
if (null != ctx && ctx.channel().isActive()) {
|
NettyChannelMap.update("Applets" + userId1, ctx);
|
// String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
// ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
|
}
|
}
|
|
|
}
|
} catch (Exception e) {
|
if (isdebug) {
|
NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
|
}
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 向客户端发送消息
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
|
// System.out.println(ctx.channel().isActive());
|
if (ctx != null && ctx.channel().isActive()) {
|
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
|
ChannelFuture sync;
|
try {
|
sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
|
if (!sync.isSuccess()) {
|
boolean b = true;
|
for (int i = 0; i < 10; i++) {
|
ctx.wait(3000);
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
if (sync.isSuccess()) {
|
b = false;
|
break;
|
}
|
System.err.println("小程序-》推送不成功,将继续推送" + msg);
|
}
|
if (b) {
|
NettyChannelMap.remove(ctx);
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("小程序-》推送发生异常,记录:" + msg);
|
NettyChannelMap.remove(ctx);
|
}
|
if (isdebug) {
|
System.err.println("小程序-》 <<<--send-->>>" + msg);
|
}
|
} else {
|
System.err.println("小程序-》推送失败,长连接不存在");
|
NettyChannelMap.remove(ctx);
|
}
|
}
|
|
// **链接断开 将推送消息记录
|
public static void sendMsgToClient(String cacheType, Integer id, String msg) {
|
ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
|
if (ctx != null) {
|
ChannelFuture sync;
|
try {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
if (!sync.isSuccess()) {
|
for (int i = 0; i < 10; i++) {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
;
|
if (!sync.isSuccess()) {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
;
|
System.err.println("推送不成功,将继续推送" + msg);
|
if (i == 9) {
|
table.put(cacheType + id, msg);
|
ctx.close();
|
System.err.println("推送发生异常,记录:" + msg);
|
}
|
} else {
|
break;
|
}
|
}
|
}
|
} catch (Exception e) {
|
table.put(cacheType + id, msg);
|
System.err.println("推送发生异常,记录:" + msg);
|
}
|
if (isdebug) {
|
System.err.println("<<<--send-->>>" + msg);
|
}
|
} else {
|
table.put(cacheType + id, msg);
|
System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
|
}
|
}
|
}
|