From 4872bb7719c4ccaaab99438af3d987787c818c2a Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期二, 05 八月 2025 23:14:48 +0800 Subject: [PATCH] 提交推送服务 --- MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java | 11 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java | 27 ++ MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 146 +++++++--------- MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java | 209 +++++++++++----------- MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java | 16 - MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java | 51 +++++ MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java | 4 MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java | 3 MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java | 12 MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java | 32 +-- 10 files changed, 270 insertions(+), 241 deletions(-) diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java index 7056cec..c39c8c9 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java @@ -18,11 +18,6 @@ @RequestMapping("/netty") public class NettyController { - @Autowired - private NettyServerController nettyServerController; - - @Autowired - private NettyWebSocketController nettyWebSocketController; /** @@ -37,12 +32,12 @@ if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ - nettyWebSocketController.sendMsgToClient(channel, msg); + NettyWebSocketController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } channel = NettyChannelMap.getData("USER" + id); if(null != channel){ - nettyServerController.sendMsgToClient(channel, msg); + NettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id)); @@ -53,7 +48,7 @@ System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map)); ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ - nettyServerController.sendMsgToClient(channel, msg); + NettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id)); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java new file mode 100644 index 0000000..1485749 --- /dev/null +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java @@ -0,0 +1,51 @@ +package com.sinata.push.util; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + + +/** + * Redis工具类 + */ +@Component +public class RedisUtil { + + @Autowired + private RedisTemplate<String, String> redisTemplate; + + + /** + * 向redis中存储字符串没有过期时间 + * @param key + * @param value + */ + public void setStrValue(String key, String value){ + redisTemplate.opsForValue().set(key, value); + } + + + /** + * 以分钟为单位设置存储值(设置过期时间) + * @param key + * @param value + * @param time 秒 + */ + public void setStrValue(String key, String value, int time){ + redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); + } + + + /** + * 从redis中获取值 + * @param key + * @return + */ + public String getValue(String key){ + return redisTemplate.opsForValue().get(key); + } + + +} diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java index 5ab55f9..f7bbac0 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java @@ -4,25 +4,17 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.ssl.ClientAuth; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.stream.ChunkedWriteHandler; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import java.io.File; public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { -// String path = "/root/server/app/cert/qytzt.cn.jks"; +// String path = "C:\\Program Files\\Apache Software Foundation\\Tomcat 8.5\\cert\\SHA256withRSA_lzhyc.cn.pfx"; +// String path = "/usr/local/server/app/cert/tomcat/scs1680576839056_chaoshengdaijia.com_server.jks"; // SSLContext sslContext = createSSLContext.createSSLContext("JKS" -// , path, "bo27xqbr"); +// , path, "Zf3^5v6OoWmOVgeQ"); + //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 // SSLEngine engine = sslContext.createSSLEngine(); -// //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 // engine.setUseClientMode(false); // socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java index dddbca5..0e6ac87 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java @@ -2,43 +2,36 @@ import com.alibaba.fastjson.JSONObject; +import com.sinata.push.util.RedisUtil; import com.sinata.push.util.SinataUtil; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.StringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; -import com.sinata.push.util.echo.NettyMsg; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; -@Component public class NettyWebSocketController { - public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); - - @Resource - private RedisTemplate<String, Object> redisTemplate; + public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); - public static Hashtable<String,String> table; - static{ - if(table == null){ - table = new Hashtable<>(); - } - } - + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + public static Hashtable<String, String> table; + + static { + if (table == null) { + table = new Hashtable<>(); + } + } + public static boolean isdebug = false; public static int i = 0; @@ -50,78 +43,80 @@ * @param msg * @author TaoNingBo */ - public void JudgeOperation(ChannelHandlerContext ctx, String msg) { + public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) { try { - // 验证即时通讯命令是否正确有效 - if(SinataUtil.isEmpty(msg)) { - return; - } - String msgStr = msg.toString(); - if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { - return; - } - if(isdebug) { + // 验证即时通讯命令是否正确有效 + if (SinataUtil.isEmpty(msg)) { + return; + } + String msgStr = msg.toString(); + if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { + return; + } + if (isdebug) { // System.out.println("<<<--receive-->>>111" + msg); - } + } - // 获取socket信息,保存相应的socket - JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); - int code = jsonMsg.getIntValue("code"); - String message = jsonMsg.getString("msg"); - String method = jsonMsg.getString("method"); - if(code != 200 || !message.equals("SUCCESS")) { - return; - } - JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); - // ############################### 心跳 ############################ - // 心跳 - if(method.equals(Method.ping)){ - String token = jsonCon.getString("token"); - String userId1 = jsonCon.getString("userId"); - if(StringUtil.isNotEmpty(userId1)){ - //确保账号在单个设备上登录 - if(StringUtil.isNotEmpty(token)){ - String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 - if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1); - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString()); - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(context); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); - } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("USER_" + userId1, token); - } - } + // 获取socket信息,保存相应的socket + JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); + int code = jsonMsg.getIntValue("code"); + String message = jsonMsg.getString("msg"); + String method = jsonMsg.getString("method"); + if (code != 200 || !message.equals("SUCCESS")) { + return; + } + JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); - //存储业务使用的通道 - if(null != ctx && ctx.channel().isActive()) { - NettyChannelMap.update("Applets" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } + if (null != ctx && ctx.channel().isActive()) { + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); } + // ############################### 心跳 ############################ + // 心跳 + if (method.equals(Method.ping)) { + String token = jsonCon.getString("token"); + String userId1 = jsonCon.getString("userId"); + if (StringUtil.isNotEmpty(userId1)) { + //确保账号在单个设备上登录 + if (StringUtil.isNotEmpty(token)) { + String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据 + if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + this.sendMsgToClient(data_, msg_.toJSONString()); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + NettyChannelMap.remove_(data_); + } + }, 5000); + } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道 + NettyChannelMap.update("Applets" + userId1, ctx); + redisUtil.setStrValue("USER_Applets_" + userId1, token); + } + + //存储业务使用的通道 + if (null != ctx && ctx.channel().isActive()) { + NettyChannelMap.update("Applets" + userId1, ctx); + } + } + + + } + } catch (Exception e) { + if (isdebug) { + NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + e.printStackTrace(); } - } catch (Exception e) { - if(isdebug) { - this.sendMsgToClient(ctx, "__error__" + msg.toString()); - } - e.printStackTrace(); } -} /** * 向客户端发送消息 @@ -137,67 +132,69 @@ ChannelFuture sync; try { sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); - if(!sync.isSuccess()){ + if (!sync.isSuccess()) { boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); - if(sync.isSuccess()){ + if (sync.isSuccess()) { b = false; break; } - System.err.println("小程序-》推送不成功,将继续推送"+msg); + System.err.println("小程序-》推送不成功,将继续推送" + msg); } - if(b){ + if (b) { NettyChannelMap.remove(ctx); } } } catch (Exception e) { - System.err.println("小程序-》推送发生异常,记录:"+msg); + System.err.println("小程序-》推送发生异常,记录:" + msg); NettyChannelMap.remove(ctx); } - if(isdebug) { - System.err.println("小程序-》 <<<--send-->>>" + msg) ; + if (isdebug) { + System.err.println("小程序-》 <<<--send-->>>" + msg); } - }else{ + } else { System.err.println("小程序-》推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } } // **链接断开 将推送消息记录 - public static void sendMsgToClient(String cacheType, Integer id,String msg) { + public static void sendMsgToClient(String cacheType, Integer id, String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ChannelFuture sync; try { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); - if(!sync.isSuccess()){ + if (!sync.isSuccess()) { for (int i = 0; i < 10; i++) { - sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; - if(!sync.isSuccess()){ - sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; - System.err.println("推送不成功,将继续推送"+msg); - if(i == 9){ - table.put(cacheType+id, msg); + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + if (!sync.isSuccess()) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + System.err.println("推送不成功,将继续推送" + msg); + if (i == 9) { + table.put(cacheType + id, msg); ctx.close(); - System.err.println("推送发生异常,记录:"+msg); + System.err.println("推送发生异常,记录:" + msg); } - }else{ + } else { break; } } } } catch (Exception e) { - table.put(cacheType+id, msg); - System.err.println("推送发生异常,记录:"+msg); + table.put(cacheType + id, msg); + System.err.println("推送发生异常,记录:" + msg); } - if(isdebug) { + if (isdebug) { System.err.println("<<<--send-->>>" + msg); } - }else{ - table.put(cacheType+id, msg); - System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); + } else { + table.put(cacheType + id, msg); + System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); } } } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java index d4843ce..7283430 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java @@ -1,7 +1,6 @@ package com.sinata.push.util.applets; import com.alibaba.fastjson.JSONObject; -import com.sinata.push.util.SpringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; @@ -16,7 +15,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; -import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; @@ -26,8 +24,6 @@ private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; - -// private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class); @@ -77,13 +73,8 @@ } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { - HttpHeaders headers = req.headers(); - headers.names().forEach(name -> { - System.out.println(name + ":" + headers.get(name)); - }); // Http解码失败,向服务器指定传输的协议为Upgrade:websocket if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ - System.out.println("Http解码失败"); sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } @@ -92,11 +83,9 @@ // 根据工厂类和HTTP请求创建握手类 handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { - System.out.println("不支持websocket"); // 不支持websocket WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { - System.out.println("通过它构造握手响应消息返回给客户端"); // 通过它构造握手响应消息返回给客户端 handshaker.handshake(ctx.channel(), req); } @@ -123,21 +112,18 @@ //给连接的客户端返回数据 //返回心跳 -// JSONObject jsonObject = new JSONObject(); -// jsonObject.put("code", 200); -// jsonObject.put("method", Method.ok); -// jsonObject.put("msg", "SUCCESS"); -// jsonObject.put("data", new JSONObject()); -// TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("code", 200); + jsonObject.put("method", Method.ok); + jsonObject.put("msg", "SUCCESS"); + jsonObject.put("data", new JSONObject()); + TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); // ctx.channel().writeAndFlush(tws); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - - -// nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理 + + new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 // 群发服务端心跳响应 -// Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); + Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java index c2a291d..797138d 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java @@ -1,6 +1,5 @@ package com.sinata.push.util.echo; -import com.sinata.push.util.SpringUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -8,14 +7,13 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; -import org.springframework.data.redis.core.RedisTemplate; import java.net.InetSocketAddress; import java.util.HashMap; public class DiscardServerHandler extends SimpleChannelInboundHandler<String> { - private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class); + private NettyServerController nettyServerController = new NettyServerController(); public static boolean isdebug = true; @@ -23,7 +21,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { - System.err.println(insocket.getAddress() + ": 收到客户端数据......."); +// System.err.println(insocket.getAddress() + ": 收到客户端数据......."); } try { // 调用service @@ -39,7 +37,7 @@ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { - System.err.println(insocket.getAddress() + ": 收到客户端数据......."); +// System.err.println(insocket.getAddress() + ": 收到客户端数据......."); } try { // 调用service @@ -70,7 +68,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { - System.err.println(insocket.getAddress() + ": Connect successful......"); +// System.err.println(insocket.getAddress() + ": Connect successful......"); } } @@ -106,7 +104,7 @@ // System.err.println(insocket.getAddress() + ": Disconnect connection......"); } NettyChannelMap.remove(ctx); - System.err.println("清除通道" + ctx); +// System.err.println("清除通道" + ctx); // super.channelInactive(ctx); } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java index cea0d88..b5f75d5 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java @@ -15,6 +15,9 @@ /** 心跳【接收】 */ public final static String ping = "PING"; + /** 心跳【响应】 */ + public final static String pong = "PONG"; + /** 司机上传位置 */ public static final String location = "LOCATION"; diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java index 228c11d..a0dd469 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java @@ -26,7 +26,7 @@ */ public static ChannelHandlerContext getData(String key) { if(map==null){ - map = new ConcurrentHashMap<String, ChannelHandlerContext>(); + map = new HashMap<String, ChannelHandlerContext>(); } return map.get(key); } @@ -78,6 +78,9 @@ */ @SuppressWarnings("rawtypes") public static synchronized void remove(ChannelHandlerContext value) { + if(null == value){ + return; + } Set<String> strings = map.keySet(); for(String key : strings){ ChannelHandlerContext channelHandlerContext = map.get(key); @@ -91,6 +94,23 @@ } + public static synchronized void remove_(ChannelHandlerContext value) { + Set<String> strings = ctxMap.keySet(); + for(String key : strings){ + ChannelHandlerContext channelHandlerContext = ctxMap.get(key); + String s = channelHandlerContext.channel().remoteAddress().toString(); + String s1 = value.channel().remoteAddress().toString(); + if(s.equals(s1)){ + channelHandlerContext.close();//关闭通道 + ctxMap.remove(key); + } + } + } + + + public static synchronized void remove_(String key) { + ctxMap.remove(key); + } /** @@ -113,4 +133,9 @@ map.put(key, value); } + + + public static synchronized void update_(String key, ChannelHandlerContext value) { + ctxMap.put(key, value); + } } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java index 40206a7..94b0a31 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java @@ -36,7 +36,7 @@ public void run() { thread.start(); } - }, 10000); + }, 3999); } /** @@ -61,7 +61,7 @@ bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 通过TCP_NODELAY禁用NAGLE,使消息立即发出去,不用等待到一定的数据量才发出去 - bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + bootstrap.option(ChannelOption.TCP_NODELAY, true); // 保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ServerInit() { diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java index c4ae7ad..0f10527 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java @@ -10,16 +10,16 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; -import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.TimeUnit; /** @@ -28,20 +28,17 @@ * @createDate 2016年6月3日 * @version 1.0 */ -@Component public class NettyServerController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); public static Hashtable<String,String> table; - - @Resource - private RedisTemplate<String, Object> redisTemplate; - + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - - static{ @@ -61,7 +58,7 @@ * @param msg * @author TaoNingBo */ - public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { + public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) { try { // ByteBuf转String ByteBuf byteBuf = (ByteBuf) msg; @@ -99,105 +96,87 @@ String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ - String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); - if(!StringUtils.hasLength(fluid_control)){ - redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); - }else{ - long l = System.currentTimeMillis() - Long.valueOf(fluid_control); - if(l >= 10000){ - redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); - }else{ - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - return; - } - } - //判断用户或者司机长连接 + //判断用户或者司机长连接 if(type==1){ - //存储通讯通道 - if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("USER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ - String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 + String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1); + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { + boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 + if(b){ + NettyChannelMap.remove_(data_); + } + new Timer().schedule(new TimerTask() { @Override public void run() { - NettyChannelMap.remove(context); + NettyChannelMap.remove_(data_); } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); + }, 5000); } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("USER_" + userId1, token); - } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + NettyChannelMap.update("USER" + userId1, ctx); + redisUtil.setStrValue("USER_APP_" + userId1, token); } + + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + NettyChannelMap.update("USER" + userId1, ctx); + } }else{ - //存储通讯通道 - if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("DRIVER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } //确保账号在单个设备上登录 - String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); - if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 - String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 + if(StringUtil.isNotEmpty(token)){//APP端登录的操作 + String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1); - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(context); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + if(null != data_){ + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 + if(b){ + NettyChannelMap.remove_(data_); + } + } } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("DRIVER_" + userId1, token); - } + NettyChannelMap.update("DRIVER" + userId1, ctx); + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + redisUtil.setStrValue("DRIVER_" + userId1, token); + } + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + NettyChannelMap.update("DRIVER" + userId1, ctx); } } } + + if(null != ctx && ctx.channel().isActive()){ + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); + } } //司机上传位置 if(method.equals(Method.location)){ Integer driverId = jsonCon.getInteger("driverId"); - String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId); + String fluid_control = redisUtil.getValue("location_" + driverId); if(!StringUtils.hasLength(fluid_control)){ - redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); }else{ long l = System.currentTimeMillis() - Long.valueOf(fluid_control); if(l < 5000){ return; } - redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); } - + Integer orderId = jsonCon.getInteger("orderId"); Integer orderType = jsonCon.getInteger("orderType"); Double lon = jsonCon.getDouble("lon"); @@ -228,7 +207,7 @@ System.err.println("调用driver-server存储位置数据出错了"); } } - redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中 + redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中 }else{ this.sendMsgToClient(ctx, "__error__" + msg.toString()); } @@ -239,7 +218,7 @@ } catch (Exception e) { if(isdebug) { - this.sendMsgToClient(ctx, "__error__" + msg.toString()); + NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } @@ -252,7 +231,7 @@ * @param msg * @author TaoNingBo */ - public void sendMsgToClient(ChannelHandlerContext ctx, String msg) { + public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; @@ -272,7 +251,9 @@ if(b){ NettyChannelMap.remove(ctx); } + return true; } + return sync.isSuccess(); } catch (Exception e) { System.err.println("推送发生异常,记录:"+msg); NettyChannelMap.remove(ctx); @@ -284,6 +265,7 @@ System.err.println("推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } + return false; } // **链接断开 将推送消息记录 -- Gitblit v1.7.1