package com.ruoyi.other.webSocket;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.ruoyi.common.core.domain.R;
|
import com.ruoyi.common.core.utils.StringUtils;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
|
import java.util.Hashtable;
|
|
public class NettyWebSocketController {
|
|
public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
|
|
|
public static Hashtable<String, String> table;
|
public static int i = 0;
|
|
static {
|
if (table == null) {
|
table = new Hashtable<>();
|
}
|
}
|
|
/**
|
* 向客户端发送消息
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public static R sendMsgToClient(ChannelHandlerContext ctx, String msg) {
|
if (ctx != null && ctx.channel().isActive()) {
|
ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
|
ChannelFuture sync;
|
try {
|
sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
|
if (!sync.isSuccess()) {
|
boolean b = true;
|
for (int i = 0; i < 10; i++) {
|
ctx.wait(3000);
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
if (sync.isSuccess()) {
|
b = false;
|
break;
|
}
|
System.err.println("小程序-》推送不成功,将继续推送" + msg);
|
}
|
if (b) {
|
NettyChannelMap.remove(ctx);
|
return R.fail("无效的消息通道");
|
}
|
}
|
return R.ok();
|
} catch (Exception e) {
|
NettyChannelMap.remove(ctx);
|
e.printStackTrace();
|
return R.fail("发送消息失败:" + e.getMessage());
|
}
|
} else {
|
NettyChannelMap.remove(ctx);
|
return R.fail("无效的消息通道");
|
}
|
}
|
|
// **链接断开 将推送消息记录
|
public static void sendMsgToClient(String cacheType, Integer id, String msg) {
|
ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
|
if (ctx != null) {
|
ChannelFuture sync;
|
try {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
if (!sync.isSuccess()) {
|
for (int i = 0; i < 10; i++) {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
;
|
if (!sync.isSuccess()) {
|
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
|
;
|
System.err.println("推送不成功,将继续推送" + msg);
|
if (i == 9) {
|
table.put(cacheType + id, msg);
|
ctx.close();
|
System.err.println("推送发生异常,记录:" + msg);
|
}
|
} else {
|
break;
|
}
|
}
|
}
|
} catch (Exception e) {
|
table.put(cacheType + id, msg);
|
System.err.println("推送发生异常,记录:" + msg);
|
}
|
} else {
|
table.put(cacheType + id, msg);
|
System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
|
}
|
}
|
|
/**
|
* 判断客户端要执行什么操作
|
*
|
* @param ctx
|
* @param msg
|
* @author TaoNingBo
|
*/
|
public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
|
try {
|
// 验证即时通讯命令是否正确有效
|
if (StringUtils.isEmpty(msg)) {
|
return;
|
}
|
String msgStr = msg.toString();
|
if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
|
return;
|
}
|
|
// 获取socket信息,保存相应的socket
|
JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
|
int code = jsonMsg.getIntValue("code");
|
String message = jsonMsg.getString("msg");
|
String method = jsonMsg.getString("method");
|
if (code != 200 || !message.equals("SUCCESS")) {
|
return;
|
}
|
JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
|
|
if (null != ctx && ctx.channel().isActive()) {
|
jsonMsg.put("method", Method.pong);
|
sendMsgToClient(ctx, jsonMsg.toJSONString());
|
}
|
|
|
// ############################### 心跳 ############################
|
// 心跳
|
if (method.equals(Method.ping)) {
|
String userId1 = jsonCon.getString("userId");
|
if (StringUtils.isNotEmpty(userId1)) {
|
//存储业务使用的通道
|
if (null != ctx && ctx.channel().isActive()) {
|
NettyChannelMap.update("Applets" + userId1, ctx);
|
}
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|