package com.sinata.push.util.applets;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.sinata.push.util.echo.Method;
|
import com.sinata.push.util.echo.NettyChannelMap;
|
import com.sinata.push.util.echo.NettyMsg;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.codec.http.*;
|
import io.netty.handler.codec.http.websocketx.*;
|
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.util.CharsetUtil;
|
|
import java.util.HashMap;
|
|
|
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
|
//用于websocket握手的处理类
|
private WebSocketServerHandshaker handshaker;
|
|
private static final String WEB_SOCKET_URL = "wss://localhost:9090/websocket";
|
|
|
|
// @Override
|
// protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
// if (msg instanceof FullHttpRequest) {
|
// // websocket连接请求
|
// handleHttpRequest(ctx, (FullHttpRequest)msg);
|
// } else if (msg instanceof WebSocketFrame) {
|
// // websocket业务处理
|
// handleWebSocketRequest(ctx, (WebSocketFrame)msg);
|
// }
|
// }
|
|
@Override
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
ctx.flush();
|
}
|
|
/** 心跳 */
|
@Override
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
if (evt instanceof IdleStateEvent) {
|
IdleStateEvent event = (IdleStateEvent) evt;
|
if (event.state().equals(IdleState.READER_IDLE))
|
{
|
//
|
}
|
else if (event.state().equals(IdleState.WRITER_IDLE))
|
{
|
//
|
}
|
else if (event.state().equals(IdleState.ALL_IDLE))
|
{
|
String msg = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
|
if(ctx != null && ctx.channel().isActive()) {
|
ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes()));
|
}
|
}
|
}
|
// super.userEventTriggered(ctx, evt);
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
ctx.close();
|
}
|
|
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
|
// Http解码失败,向服务器指定传输的协议为Upgrade:websocket
|
if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){
|
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
|
return;
|
}
|
// 握手相应处理,创建websocket握手的工厂类,
|
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
|
// 根据工厂类和HTTP请求创建握手类
|
handshaker = wsFactory.newHandshaker(req);
|
if (handshaker == null) {
|
// 不支持websocket
|
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
|
} else {
|
// 通过它构造握手响应消息返回给客户端
|
handshaker.handshake(ctx.channel(), req);
|
}
|
}
|
|
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
|
if (req instanceof CloseWebSocketFrame) {
|
// 关闭websocket连接
|
handshaker.close(ctx.channel(), (CloseWebSocketFrame)req.retain());
|
return;
|
}
|
if (req instanceof PingWebSocketFrame) {
|
ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
|
return;
|
}
|
if (!(req instanceof TextWebSocketFrame)) {
|
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
|
}
|
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
|
throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
|
}
|
String requestmsg = ((TextWebSocketFrame) req).text();
|
|
|
//给连接的客户端返回数据
|
//返回心跳
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put("code", 200);
|
jsonObject.put("method", Method.ok);
|
jsonObject.put("msg", "SUCCESS");
|
jsonObject.put("data", new JSONObject());
|
TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
|
// ctx.channel().writeAndFlush(tws);
|
|
new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理
|
|
// 群发服务端心跳响应
|
Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
|
}
|
|
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
|
// BAD_REQUEST(400) 客户端请求错误返回的应答消息
|
if(res.getStatus().code() != 200){
|
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
|
res.content().writeBytes(buf);
|
buf.release();
|
}
|
//服务端向客户端发送数据
|
ChannelFuture f = ctx.channel().writeAndFlush(res);
|
// 非法连接直接关闭连接
|
if(res.getStatus().code() != 200){
|
f.addListener(ChannelFutureListener.CLOSE);
|
}
|
}
|
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
Global.group.add(ctx.channel());
|
System.err.println("客户端与服务器端开启");
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
Global.group.remove(ctx.channel());
|
NettyChannelMap.remove(ctx);
|
System.err.println("客户端与服务器链接关闭");
|
}
|
|
|
@Override
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
if (msg instanceof FullHttpRequest) {
|
// websocket连接请求
|
handleHttpRequest(ctx, (FullHttpRequest)msg);
|
} else if (msg instanceof WebSocketFrame) {
|
// websocket业务处理
|
handleWebSocketRequest(ctx, (WebSocketFrame)msg);
|
}
|
}
|
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
if (msg instanceof FullHttpRequest) {
|
// websocket连接请求
|
handleHttpRequest(ctx, (FullHttpRequest)msg);
|
} else if (msg instanceof WebSocketFrame) {
|
// websocket业务处理
|
handleWebSocketRequest(ctx, (WebSocketFrame)msg);
|
}
|
}
|
}
|