MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -18,11 +18,6 @@ @RequestMapping("/netty") public class NettyController { @Autowired private NettyServerController nettyServerController; @Autowired private NettyWebSocketController nettyWebSocketController; /** @@ -37,12 +32,12 @@ if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ nettyWebSocketController.sendMsgToClient(channel, msg); NettyWebSocketController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } channel = NettyChannelMap.getData("USER" + id); if(null != channel){ nettyServerController.sendMsgToClient(channel, msg); NettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id)); @@ -53,7 +48,7 @@ System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map)); ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ nettyServerController.sendMsgToClient(channel, msg); NettyServerController.sendMsgToClient(channel, msg); return JSON.toJSONString(ResultUtil.success()); } return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id)); MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java
New file @@ -0,0 +1,51 @@ package com.sinata.push.util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * Redis工具类 */ @Component public class RedisUtil { @Autowired private RedisTemplate<String, String> redisTemplate; /** * 向redis中存储字符串没有过期时间 * @param key * @param value */ public void setStrValue(String key, String value){ redisTemplate.opsForValue().set(key, value); } /** * 以分钟为单位设置存储值(设置过期时间) * @param key * @param value * @param time 秒 */ public void setStrValue(String key, String value, int time){ redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } /** * 从redis中获取值 * @param key * @return */ public String getValue(String key){ return redisTemplate.opsForValue().get(key); } } MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
@@ -4,25 +4,17 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.File; public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // String path = "/root/server/app/cert/qytzt.cn.jks"; // String path = "C:\\Program Files\\Apache Software Foundation\\Tomcat 8.5\\cert\\SHA256withRSA_lzhyc.cn.pfx"; // String path = "/usr/local/server/app/cert/tomcat/scs1680576839056_chaoshengdaijia.com_server.jks"; // SSLContext sslContext = createSSLContext.createSSLContext("JKS" // , path, "bo27xqbr"); // , path, "Zf3^5v6OoWmOVgeQ"); //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 // SSLEngine engine = sslContext.createSSLEngine(); // //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信 // engine.setUseClientMode(false); // socketChannel.pipeline().addLast("ssl", new SslHandler(engine)); MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,43 +2,36 @@ import com.alibaba.fastjson.JSONObject; import com.sinata.push.util.RedisUtil; import com.sinata.push.util.SinataUtil; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.StringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; @Component public class NettyWebSocketController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); @Resource private RedisTemplate<String, Object> redisTemplate; public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); public static Hashtable<String,String> table; static{ if(table == null){ table = new Hashtable<>(); } } private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); public static Hashtable<String, String> table; static { if (table == null) { table = new Hashtable<>(); } } public static boolean isdebug = false; public static int i = 0; @@ -50,78 +43,80 @@ * @param msg * @author TaoNingBo */ public void JudgeOperation(ChannelHandlerContext ctx, String msg) { public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) { try { // 验证即时通讯命令是否正确有效 if(SinataUtil.isEmpty(msg)) { return; } String msgStr = msg.toString(); if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { return; } if(isdebug) { // 验证即时通讯命令是否正确有效 if (SinataUtil.isEmpty(msg)) { return; } String msgStr = msg.toString(); if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { return; } if (isdebug) { // System.out.println("<<<--receive-->>>111" + msg); } } // 获取socket信息,保存相应的socket JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); int code = jsonMsg.getIntValue("code"); String message = jsonMsg.getString("msg"); String method = jsonMsg.getString("method"); if(code != 200 || !message.equals("SUCCESS")) { return; } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); // ############################### 心跳 ############################ // 心跳 if(method.equals(Method.ping)){ String token = jsonCon.getString("token"); String userId1 = jsonCon.getString("userId"); if(StringUtil.isNotEmpty(userId1)){ //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(context, msg_.toJSONString()); TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove(context); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisTemplate.opsForValue().set("USER_" + userId1, token); } } // 获取socket信息,保存相应的socket JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); int code = jsonMsg.getIntValue("code"); String message = jsonMsg.getString("msg"); String method = jsonMsg.getString("method"); if (code != 200 || !message.equals("SUCCESS")) { return; } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); //存储业务使用的通道 if(null != ctx && ctx.channel().isActive()) { NettyChannelMap.update("Applets" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } if (null != ctx && ctx.channel().isActive()) { jsonMsg.put("method", Method.pong); sendMsgToClient(ctx, jsonMsg.toJSONString()); } // ############################### 心跳 ############################ // 心跳 if (method.equals(Method.ping)) { String token = jsonCon.getString("token"); String userId1 = jsonCon.getString("userId"); if (StringUtil.isNotEmpty(userId1)) { //确保账号在单个设备上登录 if (StringUtil.isNotEmpty(token)) { String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据 if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(data_, msg_.toJSONString()); new Timer().schedule(new TimerTask() { @Override public void run() { NettyChannelMap.remove_(data_); } }, 5000); } NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道 NettyChannelMap.update("Applets" + userId1, ctx); redisUtil.setStrValue("USER_Applets_" + userId1, token); } //存储业务使用的通道 if (null != ctx && ctx.channel().isActive()) { NettyChannelMap.update("Applets" + userId1, ctx); } } } } catch (Exception e) { if (isdebug) { NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } } catch (Exception e) { if(isdebug) { this.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } } /** * 向客户端发送消息 @@ -137,67 +132,69 @@ ChannelFuture sync; try { sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); if(!sync.isSuccess()){ if (!sync.isSuccess()) { boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if(sync.isSuccess()){ if (sync.isSuccess()) { b = false; break; } System.err.println("小程序-》推送不成功,将继续推送"+msg); System.err.println("小程序-》推送不成功,将继续推送" + msg); } if(b){ if (b) { NettyChannelMap.remove(ctx); } } } catch (Exception e) { System.err.println("小程序-》推送发生异常,记录:"+msg); System.err.println("小程序-》推送发生异常,记录:" + msg); NettyChannelMap.remove(ctx); } if(isdebug) { System.err.println("小程序-》 <<<--send-->>>" + msg) ; if (isdebug) { System.err.println("小程序-》 <<<--send-->>>" + msg); } }else{ } else { System.err.println("小程序-》推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } } // **链接断开 将推送消息记录 public static void sendMsgToClient(String cacheType, Integer id,String msg) { public static void sendMsgToClient(String cacheType, Integer id, String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ChannelFuture sync; try { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); if(!sync.isSuccess()){ if (!sync.isSuccess()) { for (int i = 0; i < 10; i++) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; if(!sync.isSuccess()){ sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; System.err.println("推送不成功,将继续推送"+msg); if(i == 9){ table.put(cacheType+id, msg); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; if (!sync.isSuccess()) { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); ; System.err.println("推送不成功,将继续推送" + msg); if (i == 9) { table.put(cacheType + id, msg); ctx.close(); System.err.println("推送发生异常,记录:"+msg); System.err.println("推送发生异常,记录:" + msg); } }else{ } else { break; } } } } catch (Exception e) { table.put(cacheType+id, msg); System.err.println("推送发生异常,记录:"+msg); table.put(cacheType + id, msg); System.err.println("推送发生异常,记录:" + msg); } if(isdebug) { if (isdebug) { System.err.println("<<<--send-->>>" + msg); } }else{ table.put(cacheType+id, msg); System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); } else { table.put(cacheType + id, msg); System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); } } } MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
@@ -1,7 +1,6 @@ package com.sinata.push.util.applets; import com.alibaba.fastjson.JSONObject; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; @@ -16,7 +15,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; @@ -26,8 +24,6 @@ private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; // private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class); @@ -77,13 +73,8 @@ } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { HttpHeaders headers = req.headers(); headers.names().forEach(name -> { System.out.println(name + ":" + headers.get(name)); }); // Http解码失败,向服务器指定传输的协议为Upgrade:websocket if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){ System.out.println("Http解码失败"); sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } @@ -92,11 +83,9 @@ // 根据工厂类和HTTP请求创建握手类 handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { System.out.println("不支持websocket"); // 不支持websocket WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { System.out.println("通过它构造握手响应消息返回给客户端"); // 通过它构造握手响应消息返回给客户端 handshaker.handshake(ctx.channel(), req); } @@ -123,21 +112,18 @@ //给连接的客户端返回数据 //返回心跳 // JSONObject jsonObject = new JSONObject(); // jsonObject.put("code", 200); // jsonObject.put("method", Method.ok); // jsonObject.put("msg", "SUCCESS"); // jsonObject.put("data", new JSONObject()); // TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); JSONObject jsonObject = new JSONObject(); jsonObject.put("code", 200); jsonObject.put("method", Method.ok); jsonObject.put("msg", "SUCCESS"); jsonObject.put("data", new JSONObject()); TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString()); // ctx.channel().writeAndFlush(tws); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); // nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理 new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理 // 群发服务端心跳响应 // Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); Global.group.writeAndFlush(new TextWebSocketFrame((tws).text())); } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
@@ -1,6 +1,5 @@ package com.sinata.push.util.echo; import com.sinata.push.util.SpringUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -8,14 +7,13 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import org.springframework.data.redis.core.RedisTemplate; import java.net.InetSocketAddress; import java.util.HashMap; public class DiscardServerHandler extends SimpleChannelInboundHandler<String> { private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class); private NettyServerController nettyServerController = new NettyServerController(); public static boolean isdebug = true; @@ -23,7 +21,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { System.err.println(insocket.getAddress() + ": 收到客户端数据......."); // System.err.println(insocket.getAddress() + ": 收到客户端数据......."); } try { // 调用service @@ -39,7 +37,7 @@ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { System.err.println(insocket.getAddress() + ": 收到客户端数据......."); // System.err.println(insocket.getAddress() + ": 收到客户端数据......."); } try { // 调用service @@ -70,7 +68,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); if(isdebug) { System.err.println(insocket.getAddress() + ": Connect successful......"); // System.err.println(insocket.getAddress() + ": Connect successful......"); } } @@ -106,7 +104,7 @@ // System.err.println(insocket.getAddress() + ": Disconnect connection......"); } NettyChannelMap.remove(ctx); System.err.println("清除通道" + ctx); // System.err.println("清除通道" + ctx); // super.channelInactive(ctx); } MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java
@@ -15,6 +15,9 @@ /** 心跳【接收】 */ public final static String ping = "PING"; /** 心跳【响应】 */ public final static String pong = "PONG"; /** 司机上传位置 */ public static final String location = "LOCATION"; MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -26,7 +26,7 @@ */ public static ChannelHandlerContext getData(String key) { if(map==null){ map = new ConcurrentHashMap<String, ChannelHandlerContext>(); map = new HashMap<String, ChannelHandlerContext>(); } return map.get(key); } @@ -78,6 +78,9 @@ */ @SuppressWarnings("rawtypes") public static synchronized void remove(ChannelHandlerContext value) { if(null == value){ return; } Set<String> strings = map.keySet(); for(String key : strings){ ChannelHandlerContext channelHandlerContext = map.get(key); @@ -91,6 +94,23 @@ } public static synchronized void remove_(ChannelHandlerContext value) { Set<String> strings = ctxMap.keySet(); for(String key : strings){ ChannelHandlerContext channelHandlerContext = ctxMap.get(key); String s = channelHandlerContext.channel().remoteAddress().toString(); String s1 = value.channel().remoteAddress().toString(); if(s.equals(s1)){ channelHandlerContext.close();//关闭通道 ctxMap.remove(key); } } } public static synchronized void remove_(String key) { ctxMap.remove(key); } /** @@ -113,4 +133,9 @@ map.put(key, value); } public static synchronized void update_(String key, ChannelHandlerContext value) { ctxMap.put(key, value); } } MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java
@@ -36,7 +36,7 @@ public void run() { thread.start(); } }, 10000); }, 3999); } /** @@ -61,7 +61,7 @@ bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 通过TCP_NODELAY禁用NAGLE,使消息立即发出去,不用等待到一定的数据量才发出去 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.TCP_NODELAY, true); // 保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ServerInit() { MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,16 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -28,20 +28,17 @@ * @createDate 2016年6月3日 * @version 1.0 */ @Component public class NettyServerController { public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); public static Hashtable<String,String> table; @Resource private RedisTemplate<String, Object> redisTemplate; private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); static{ @@ -61,7 +58,7 @@ * @param msg * @author TaoNingBo */ public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) { try { // ByteBuf转String ByteBuf byteBuf = (ByteBuf) msg; @@ -99,105 +96,87 @@ String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); if(!StringUtils.hasLength(fluid_control)){ redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); }else{ long l = System.currentTimeMillis() - Long.valueOf(fluid_control); if(l >= 10000){ redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); }else{ String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); return; } } //判断用户或者司机长连接 //判断用户或者司机长连接 if(type==1){ //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("USER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1); ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 if(b){ NettyChannelMap.remove_(data_); } new Timer().schedule(new TimerTask() { @Override public void run() { NettyChannelMap.remove(context); NettyChannelMap.remove_(data_); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); }, 5000); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisTemplate.opsForValue().set("USER_" + userId1, token); } NettyChannelMap.update_(token.substring(token.length() - 16), ctx); NettyChannelMap.update("USER" + userId1, ctx); redisUtil.setStrValue("USER_APP_" + userId1, token); } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ NettyChannelMap.update("USER" + userId1, ctx); } }else{ //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } //确保账号在单个设备上登录 String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token)){//APP端登录的操作 String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove(context); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); if(null != data_){ JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 if(b){ NettyChannelMap.remove_(data_); } } } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } NettyChannelMap.update("DRIVER" + userId1, ctx); NettyChannelMap.update_(token.substring(token.length() - 16), ctx); redisUtil.setStrValue("DRIVER_" + userId1, token); } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ NettyChannelMap.update("DRIVER" + userId1, ctx); } } } if(null != ctx && ctx.channel().isActive()){ jsonMsg.put("method", Method.pong); sendMsgToClient(ctx, jsonMsg.toJSONString()); } } //司机上传位置 if(method.equals(Method.location)){ Integer driverId = jsonCon.getInteger("driverId"); String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId); String fluid_control = redisUtil.getValue("location_" + driverId); if(!StringUtils.hasLength(fluid_control)){ redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); }else{ long l = System.currentTimeMillis() - Long.valueOf(fluid_control); if(l < 5000){ return; } redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + ""); } Integer orderId = jsonCon.getInteger("orderId"); Integer orderType = jsonCon.getInteger("orderType"); Double lon = jsonCon.getDouble("lon"); @@ -228,7 +207,7 @@ System.err.println("调用driver-server存储位置数据出错了"); } } redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中 redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中 }else{ this.sendMsgToClient(ctx, "__error__" + msg.toString()); } @@ -239,7 +218,7 @@ } catch (Exception e) { if(isdebug) { this.sendMsgToClient(ctx, "__error__" + msg.toString()); NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } e.printStackTrace(); } @@ -252,7 +231,7 @@ * @param msg * @author TaoNingBo */ public void sendMsgToClient(ChannelHandlerContext ctx, String msg) { public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; @@ -272,7 +251,9 @@ if(b){ NettyChannelMap.remove(ctx); } return true; } return sync.isSuccess(); } catch (Exception e) { System.err.println("推送发生异常,记录:"+msg); NettyChannelMap.remove(ctx); @@ -284,6 +265,7 @@ System.err.println("推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } return false; } // **链接断开 将推送消息记录