package com.ruoyi.admin.netty;
|
|
import com.alibaba.fastjson.JSONObject;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.*;
|
import io.netty.handler.codec.http.*;
|
import io.netty.handler.codec.http.websocketx.*;
|
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.util.CharsetUtil;
|
|
import java.util.HashMap;
|
|
|
/**
|
* @author HJL
|
*/
|
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
|
/**
|
* 用于websocket握手的处理类
|
*/
|
private WebSocketServerHandshaker handshaker;
|
|
private static final String WEB_SOCKET_URL = "ws://localhost:9595/websocket";
|
|
|
// @Override
|
// protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
// if (msg instanceof FullHttpRequest) {
|
// // websocket连接请求
|
// handleHttpRequest(ctx, (FullHttpRequest)msg);
|
// } else if (msg instanceof WebSocketFrame) {
|
// // websocket业务处理
|
// handleWebSocketRequest(ctx, (WebSocketFrame)msg);
|
// }
|
// }
|
|
@Override
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
ctx.flush();
|
}
|
|
/**
|
* 心跳
|
*/
|
@Override
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
if (evt instanceof IdleStateEvent) {
|
IdleStateEvent event = (IdleStateEvent) evt;
|
if (event.state().equals(IdleState.READER_IDLE)) {
|
//
|
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
|
//
|
} else if (event.state().equals(IdleState.ALL_IDLE)) {
|
String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
if (ctx != null && ctx.channel().isActive()) {
|
ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes()));
|
}
|
}
|
}
|
// super.userEventTriggered(ctx, evt);
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
ctx.close();
|
}
|
|
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
|
// Http解码失败,向服务器指定传输的协议为Upgrade:websocket
|
if (!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))) {
|
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
|
return;
|
}
|
// 客户端连接信息
|
// String s = String.valueOf(ctx.channel().remoteAddress());
|
// int i = s.indexOf(":");
|
// String substring = s.substring(1, i);
|
// NettyChannelMap.map.put(substring, ctx);
|
// 握手相应处理,创建websocket握手的工厂类,
|
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
|
// 根据工厂类和HTTP请求创建握手类
|
handshaker = wsFactory.newHandshaker(req);
|
if (handshaker == null) {
|
// 不支持websocket
|
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
|
} else {
|
// 通过它构造握手响应消息返回给客户端
|
handshaker.handshake(ctx.channel(), req);
|
}
|
}
|
|
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
|
if (req instanceof CloseWebSocketFrame) {
|
// 关闭websocket连接
|
handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain());
|
return;
|
}
|
if (req instanceof PingWebSocketFrame) {
|
ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
|
return;
|
}
|
if (!(req instanceof TextWebSocketFrame)) {
|
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
|
}
|
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
|
throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
|
}
|
String requestmsg = ((TextWebSocketFrame) req).text();
|
|
System.out.println("客户端消息解析:" + requestmsg);
|
if (NettyChannelMap.map.containsKey(requestmsg)) {
|
NettyChannelMap.map.remove(requestmsg);
|
NettyChannelMap.map.put(requestmsg, ctx);
|
} else {
|
NettyChannelMap.map.put(requestmsg, ctx);
|
}
|
//给连接的客户端返回数据
|
//返回心跳
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("code", 200);
|
jsonObject.put("method", Method.ok);
|
jsonObject.put("msg", "SUCCESS");
|
jsonObject.put("data", new JSONObject());
|
// TextWebSocketFrame tws = new TextWebSocketFrame("您有一条新的订单,请及时处理!");
|
TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
|
// ctx.channel().writeAndFlush(tws);
|
// 小程序心跳处理
|
new NettyWebSocketController().JudgeOperation(ctx, requestmsg);
|
|
// 群发服务端心跳响应
|
// Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
|
// Global.group.write(new TextWebSocketFrame((tws).text()));
|
}
|
|
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
|
// BAD_REQUEST(400) 客户端请求错误返回的应答消息
|
if (res.getStatus().code() != 200) {
|
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
|
res.content().writeBytes(buf);
|
buf.release();
|
}
|
//服务端向客户端发送数据
|
ChannelFuture f = ctx.channel().writeAndFlush(res);
|
// 非法连接直接关闭连接
|
if (res.getStatus().code() != 200) {
|
f.addListener(ChannelFutureListener.CLOSE);
|
}
|
}
|
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
Global.group.add(ctx.channel());
|
// NettyChannelMap.map.put()
|
System.err.println("客户端与服务器端开启");
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
Global.group.remove(ctx.channel());
|
NettyChannelMap.remove(ctx);
|
System.err.println("客户端与服务器链接关闭");
|
}
|
|
@Override
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
if (msg instanceof FullHttpRequest) {
|
// websocket连接请求
|
handleHttpRequest(ctx, (FullHttpRequest) msg);
|
} else if (msg instanceof WebSocketFrame) {
|
// websocket业务处理
|
System.out.println("客户端发来消息:" + ctx.channel().remoteAddress());
|
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
|
}
|
}
|
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
System.out.println("客户端消息:" + msg);
|
if (msg instanceof FullHttpRequest) {
|
// websocket连接请求
|
handleHttpRequest(ctx, (FullHttpRequest) msg);
|
} else if (msg instanceof WebSocketFrame) {
|
// websocket业务处理
|
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
|
}
|
}
|
|
@Override
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
Channel channel = ctx.channel();
|
System.err.println("有新的客户端与服务器发生连接。客户端地址:" + channel.remoteAddress());
|
// channelGroup.add(channel);
|
}
|
}
|