package cn.mb.cloud.gateway.util.applets; import cn.mb.cloud.gateway.util.echo.Method; import cn.mb.cloud.gateway.util.echo.NettyChannelMap; import cn.mb.cloud.gateway.util.echo.NettyMsg; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import java.util.HashMap; public class WebSocketHandler extends SimpleChannelInboundHandler { //用于websocket握手的处理类 private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "wss://localhost:9090/websocket"; // @Override // protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { // if (msg instanceof FullHttpRequest) { // // websocket连接请求 // handleHttpRequest(ctx, (FullHttpRequest)msg); // } else if (msg instanceof WebSocketFrame) { // // websocket业务处理 // handleWebSocketRequest(ctx, (WebSocketFrame)msg); // } // } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** 心跳 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { // } else if (event.state().equals(IdleState.WRITER_IDLE)) { // } else if (event.state().equals(IdleState.ALL_IDLE)) { String msg = NettyMsg.setMsg(Method.ok, new HashMap()); if(ctx != null && ctx.channel().isActive()) { ctx.writeAndFlush(Unpooled.copiedBuffer((msg).getBytes())); } } } // super.userEventTriggered(ctx, evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // Http解码失败,向服务器指定传输的协议为Upgrade:websocket if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // 握手相应处理,创建websocket握手的工厂类, WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false); // 根据工厂类和HTTP请求创建握手类 handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { // 不支持websocket WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { // 通过它构造握手响应消息返回给客户端 handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception { if (req instanceof CloseWebSocketFrame) { // 关闭websocket连接 handshaker.close(ctx.channel(), (CloseWebSocketFrame)req.retain()); return; } if (req instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(req.content().retain())); return; } if (!(req instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息"); } if (ctx == null || this.handshaker == null || ctx.isRemoved()) { throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息"); } String requestmsg = ((TextWebSocketFrame) req).text(); //给连接的客户端返回数据 //返回心跳 JSONObject jsonObject = new JSONObject(); jsonObject.put("code", 200); jsonObject.put("method", Method.ok); jsonObject.put("msg", "SUCCESS"); jsonObject.put("data", new JSONObject()); TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); // ctx.channel().writeAndFlush(tws); new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 // 群发服务端心跳响应 Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // BAD_REQUEST(400) 客户端请求错误返回的应答消息 if(res.getStatus().code() != 200){ ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } //服务端向客户端发送数据 ChannelFuture f = ctx.channel().writeAndFlush(res); // 非法连接直接关闭连接 if(res.getStatus().code() != 200){ f.addListener(ChannelFutureListener.CLOSE); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Global.group.add(ctx.channel()); System.err.println("客户端与服务器端开启"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Global.group.remove(ctx.channel()); NettyChannelMap.remove(ctx); System.err.println("客户端与服务器链接关闭"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { // websocket连接请求 handleHttpRequest(ctx, (FullHttpRequest)msg); } else if (msg instanceof WebSocketFrame) { // websocket业务处理 handleWebSocketRequest(ctx, (WebSocketFrame)msg); } } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { // websocket连接请求 handleHttpRequest(ctx, (FullHttpRequest)msg); } else if (msg instanceof WebSocketFrame) { // websocket业务处理 handleWebSocketRequest(ctx, (WebSocketFrame)msg); } } }