From 4872bb7719c4ccaaab99438af3d987787c818c2a Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期二, 05 八月 2025 23:14:48 +0800
Subject: [PATCH] 提交推送服务

---
 MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java            |   11 
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java             |   27 ++
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java       |  146 +++++++---------
 MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java |  209 +++++++++++-----------
 MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java      |   16 -
 MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java                        |   51 +++++
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java                 |    4 
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java                      |    3 
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java        |   12 
 MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java         |   32 +--
 10 files changed, 270 insertions(+), 241 deletions(-)

diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
index 7056cec..c39c8c9 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -18,11 +18,6 @@
 @RequestMapping("/netty")
 public class NettyController {
     
-    @Autowired
-    private NettyServerController nettyServerController;
-    
-    @Autowired
-    private NettyWebSocketController nettyWebSocketController;
 
 
     /**
@@ -37,12 +32,12 @@
         if(type == 1){//用户端
             ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
             if(null != channel){
-                nettyWebSocketController.sendMsgToClient(channel, msg);
+                NettyWebSocketController.sendMsgToClient(channel, msg);
                 return JSON.toJSONString(ResultUtil.success());
             }
             channel = NettyChannelMap.getData("USER" + id);
             if(null != channel){
-                nettyServerController.sendMsgToClient(channel, msg);
+	            NettyServerController.sendMsgToClient(channel, msg);
                 return JSON.toJSONString(ResultUtil.success());
             }
             return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id));
@@ -53,7 +48,7 @@
             System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map));
             ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
             if(null != channel){
-                nettyServerController.sendMsgToClient(channel, msg);
+                NettyServerController.sendMsgToClient(channel, msg);
                 return JSON.toJSONString(ResultUtil.success());
             }
             return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id));
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java
new file mode 100644
index 0000000..1485749
--- /dev/null
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java
@@ -0,0 +1,51 @@
+package com.sinata.push.util;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Redis工具类
+ */
+@Component
+public class RedisUtil {
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+
+    /**
+     * 向redis中存储字符串没有过期时间
+     * @param key
+     * @param value
+     */
+    public void setStrValue(String key, String value){
+        redisTemplate.opsForValue().set(key, value);
+    }
+
+
+    /**
+     * 以分钟为单位设置存储值(设置过期时间)
+     * @param key
+     * @param value
+     * @param time 秒
+     */
+    public void setStrValue(String key, String value, int time){
+        redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
+    }
+
+
+    /**
+     * 从redis中获取值
+     * @param key
+     * @return
+     */
+    public String getValue(String key){
+        return redisTemplate.opsForValue().get(key);
+    }
+
+
+}
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
index 5ab55f9..f7bbac0 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
@@ -4,25 +4,17 @@
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.stream.ChunkedWriteHandler;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import java.io.File;
 
 public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
     @Override
     protected void initChannel(SocketChannel socketChannel) throws Exception {
-//        String path = "/root/server/app/cert/qytzt.cn.jks";
+//        String path = "C:\\Program Files\\Apache Software Foundation\\Tomcat 8.5\\cert\\SHA256withRSA_lzhyc.cn.pfx";
+//        String path = "/usr/local/server/app/cert/tomcat/scs1680576839056_chaoshengdaijia.com_server.jks";
 //        SSLContext sslContext = createSSLContext.createSSLContext("JKS"
-//                , path, "bo27xqbr");
+//                , path, "Zf3^5v6OoWmOVgeQ");
+        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
 //        SSLEngine engine = sslContext.createSSLEngine();
-//        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
 //        engine.setUseClientMode(false);
 //        socketChannel.pipeline().addLast("ssl", new SslHandler(engine));
 
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
index dddbca5..0e6ac87 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,43 +2,36 @@
 
 
 import com.alibaba.fastjson.JSONObject;
+import com.sinata.push.util.RedisUtil;
 import com.sinata.push.util.SinataUtil;
 import com.sinata.push.util.SpringUtil;
 import com.sinata.push.util.StringUtil;
 import com.sinata.push.util.echo.Method;
 import com.sinata.push.util.echo.NettyChannelMap;
-import com.sinata.push.util.echo.NettyMsg;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Timer;
 import java.util.TimerTask;
 
-@Component
 public class NettyWebSocketController {
 
-    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
-    
-    @Resource
-    private RedisTemplate<String, Object> redisTemplate;
+    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
 
-	public static Hashtable<String,String> table;
-	static{
-		if(table == null){
-			table = new Hashtable<>();
-		}
-	}
-	
+    private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+
+    public static Hashtable<String, String> table;
+
+    static {
+        if (table == null) {
+            table = new Hashtable<>();
+        }
+    }
+
     public static boolean isdebug = false;
     public static int i = 0;
 
@@ -50,78 +43,80 @@
      * @param msg
      * @author TaoNingBo
      */
-    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
+    public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) {
         try {
-        // 验证即时通讯命令是否正确有效
-        if(SinataUtil.isEmpty(msg)) {
-            return;
-        }
-        String msgStr = msg.toString();
-        if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
-            return;
-        }
-        if(isdebug) {
+            // 验证即时通讯命令是否正确有效
+            if (SinataUtil.isEmpty(msg)) {
+                return;
+            }
+            String msgStr = msg.toString();
+            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
+                return;
+            }
+            if (isdebug) {
 //            System.out.println("<<<--receive-->>>111" + msg);
-        }
+            }
 
-        // 获取socket信息,保存相应的socket
-        JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
-        int code = jsonMsg.getIntValue("code");
-        String message = jsonMsg.getString("msg");
-        String method = jsonMsg.getString("method");
-        if(code != 200 || !message.equals("SUCCESS")) {
-            return;
-        }
-        JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
-        // ############################### 心跳  ############################
-        // 心跳
-        if(method.equals(Method.ping)){
-            String token = jsonCon.getString("token");
-            String userId1 = jsonCon.getString("userId");
-            if(StringUtil.isNotEmpty(userId1)){
-                //确保账号在单个设备上登录
-                if(StringUtil.isNotEmpty(token)){
-                    String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
-                    if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
-                        ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1);
-                        JSONObject msg_ = new JSONObject();
-                        msg_.put("code", 200);
-                        msg_.put("msg", "SUCCESS");
-                        msg_.put("method", "OFFLINE");
-                        msg_.put("data", new Object());
-                        this.sendMsgToClient(context, msg_.toJSONString());
-                        TimerTask timerTask = new TimerTask() {
-                            @Override
-                            public void run() {
-                                NettyChannelMap.remove(context);
-                            }
-                        };
-                        Timer timer = new Timer();
-                        timer.schedule(timerTask, 3000);
-                        timer.cancel();
-                    }
-                    if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                        redisTemplate.opsForValue().set("USER_" + userId1, token);
-                    }
-                }
+            // 获取socket信息,保存相应的socket
+            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
+            int code = jsonMsg.getIntValue("code");
+            String message = jsonMsg.getString("msg");
+            String method = jsonMsg.getString("method");
+            if (code != 200 || !message.equals("SUCCESS")) {
+                return;
+            }
+            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
 
-                //存储业务使用的通道
-                if(null != ctx && ctx.channel().isActive()) {
-                    NettyChannelMap.update("Applets" + userId1, ctx);
-                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-                }
+            if (null != ctx && ctx.channel().isActive()) {
+                jsonMsg.put("method", Method.pong);
+                sendMsgToClient(ctx, jsonMsg.toJSONString());
             }
 
 
+            // ############################### 心跳  ############################
+            // 心跳
+            if (method.equals(Method.ping)) {
+                String token = jsonCon.getString("token");
+                String userId1 = jsonCon.getString("userId");
+                if (StringUtil.isNotEmpty(userId1)) {
+                    //确保账号在单个设备上登录
+                    if (StringUtil.isNotEmpty(token)) {
+                        String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据
+                        if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
+                            ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+                            JSONObject msg_ = new JSONObject();
+                            msg_.put("code", 200);
+                            msg_.put("msg", "SUCCESS");
+                            msg_.put("method", "OFFLINE");
+                            msg_.put("data", new Object());
+                            this.sendMsgToClient(data_, msg_.toJSONString());
+                            new Timer().schedule(new TimerTask() {
+                                @Override
+                                public void run() {
+                                    NettyChannelMap.remove_(data_);
+                                }
+                            }, 5000);
+                        }
+                        NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道
+                        NettyChannelMap.update("Applets" + userId1, ctx);
+                        redisUtil.setStrValue("USER_Applets_" + userId1, token);
+                    }
+
+                    //存储业务使用的通道
+                    if (null != ctx && ctx.channel().isActive()) {
+                        NettyChannelMap.update("Applets" + userId1, ctx);
+                    }
+                }
+
+
+            }
+        } catch (Exception e) {
+            if (isdebug) {
+                NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
+            }
+            e.printStackTrace();
         }
-    } catch (Exception e) {
-        if(isdebug) {
-        	this.sendMsgToClient(ctx, "__error__" + msg.toString());
-        }
-        e.printStackTrace();
     }
-}
 
     /**
      * 向客户端发送消息
@@ -137,67 +132,69 @@
             ChannelFuture sync;
             try {
                 sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
-                if(!sync.isSuccess()){
+                if (!sync.isSuccess()) {
                     boolean b = true;
                     for (int i = 0; i < 10; i++) {
                         ctx.wait(3000);
                         sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
-                        if(sync.isSuccess()){
+                        if (sync.isSuccess()) {
                             b = false;
                             break;
                         }
-                        System.err.println("小程序-》推送不成功,将继续推送"+msg);
+                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                     }
-                    if(b){
+                    if (b) {
                         NettyChannelMap.remove(ctx);
                     }
                 }
             } catch (Exception e) {
-                System.err.println("小程序-》推送发生异常,记录:"+msg);
+                System.err.println("小程序-》推送发生异常,记录:" + msg);
                 NettyChannelMap.remove(ctx);
             }
-            if(isdebug) {
-                System.err.println("小程序-》 <<<--send-->>>" + msg) ;
+            if (isdebug) {
+                System.err.println("小程序-》 <<<--send-->>>" + msg);
             }
-        }else{
+        } else {
             System.err.println("小程序-》推送失败,长连接不存在");
             NettyChannelMap.remove(ctx);
         }
     }
 
     //	**链接断开 将推送消息记录
-    public static void sendMsgToClient(String cacheType, Integer id,String msg) {
+    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
         ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
         if (ctx != null) {
             ChannelFuture sync;
             try {
                 sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
-                if(!sync.isSuccess()){
+                if (!sync.isSuccess()) {
                     for (int i = 0; i < 10; i++) {
-                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
-                        if(!sync.isSuccess()){
-                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
-                            System.err.println("推送不成功,将继续推送"+msg);
-                            if(i == 9){
-                                table.put(cacheType+id, msg);
+                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+                        ;
+                        if (!sync.isSuccess()) {
+                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+                            ;
+                            System.err.println("推送不成功,将继续推送" + msg);
+                            if (i == 9) {
+                                table.put(cacheType + id, msg);
                                 ctx.close();
-                                System.err.println("推送发生异常,记录:"+msg);
+                                System.err.println("推送发生异常,记录:" + msg);
                             }
-                        }else{
+                        } else {
                             break;
                         }
                     }
                 }
             } catch (Exception e) {
-                table.put(cacheType+id, msg);
-                System.err.println("推送发生异常,记录:"+msg);
+                table.put(cacheType + id, msg);
+                System.err.println("推送发生异常,记录:" + msg);
             }
-            if(isdebug) {
+            if (isdebug) {
                 System.err.println("<<<--send-->>>" + msg);
             }
-        }else{
-            table.put(cacheType+id, msg);
-            System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
+        } else {
+            table.put(cacheType + id, msg);
+            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
         }
     }
 }
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
index d4843ce..7283430 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
@@ -1,7 +1,6 @@
 package com.sinata.push.util.applets;
 
 import com.alibaba.fastjson.JSONObject;
-import com.sinata.push.util.SpringUtil;
 import com.sinata.push.util.echo.Method;
 import com.sinata.push.util.echo.NettyChannelMap;
 import com.sinata.push.util.echo.NettyMsg;
@@ -16,7 +15,6 @@
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.CharsetUtil;
-import org.springframework.data.redis.core.StringRedisTemplate;
 
 import java.util.HashMap;
 
@@ -26,8 +24,6 @@
     private WebSocketServerHandshaker handshaker;
 
     private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
-    
-//    private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class);
 
 
 
@@ -77,13 +73,8 @@
     }
 
     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
-        HttpHeaders headers = req.headers();
-        headers.names().forEach(name -> {
-            System.out.println(name + ":" + headers.get(name));
-        });
         // Http解码失败,向服务器指定传输的协议为Upgrade:websocket
         if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){
-            System.out.println("Http解码失败");
             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
             return;
         }
@@ -92,11 +83,9 @@
         // 根据工厂类和HTTP请求创建握手类
         handshaker = wsFactory.newHandshaker(req);
         if (handshaker == null) {
-            System.out.println("不支持websocket");
             // 不支持websocket
             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
         } else {
-            System.out.println("通过它构造握手响应消息返回给客户端");
             // 通过它构造握手响应消息返回给客户端
             handshaker.handshake(ctx.channel(), req);
         }
@@ -123,21 +112,18 @@
 
         //给连接的客户端返回数据
         //返回心跳
-//        JSONObject jsonObject = new JSONObject();
-//        jsonObject.put("code", 200);
-//        jsonObject.put("method", Method.ok);
-//        jsonObject.put("msg", "SUCCESS");
-//        jsonObject.put("data", new JSONObject());
-//        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("code", 200);
+        jsonObject.put("method", Method.ok);
+        jsonObject.put("msg", "SUCCESS");
+        jsonObject.put("data", new JSONObject());
+        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
 //        ctx.channel().writeAndFlush(tws);
-        String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-        ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-        
-    
-//        nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理
+
+        new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理
 
         // 群发服务端心跳响应
-//        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
+        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
     }
 
     private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
index c2a291d..797138d 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
@@ -1,6 +1,5 @@
 package com.sinata.push.util.echo;
 
-import com.sinata.push.util.SpringUtil;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -8,14 +7,13 @@
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.ReferenceCountUtil;
-import org.springframework.data.redis.core.RedisTemplate;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 
 public class DiscardServerHandler extends SimpleChannelInboundHandler<String>  {
 
-	private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class);
+	private NettyServerController nettyServerController = new NettyServerController();
 	
 	public static boolean isdebug = true;
 
@@ -23,7 +21,7 @@
 	public void channelRead(ChannelHandlerContext ctx, Object msg) {
 		InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
 		if(isdebug) {
-			System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
+//			System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
 		}
 		try {
 			// 调用service
@@ -39,7 +37,7 @@
 	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
 		InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
 		if(isdebug) {
-			System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
+//			System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
 		}
 		try {
 			// 调用service
@@ -70,7 +68,7 @@
 	public void channelActive(final ChannelHandlerContext ctx) throws Exception {
 		InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
 		if(isdebug) {
-			System.err.println(insocket.getAddress() + ": Connect successful......");
+//			System.err.println(insocket.getAddress() + ": Connect successful......");
 		}
 	}
 
@@ -106,7 +104,7 @@
 //			System.err.println(insocket.getAddress() + ": Disconnect connection......");
 		}
 		NettyChannelMap.remove(ctx);
-		System.err.println("清除通道" + ctx);
+//		System.err.println("清除通道" + ctx);
 //		super.channelInactive(ctx);
 	}
 
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java
index cea0d88..b5f75d5 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java
@@ -15,6 +15,9 @@
 	/** 心跳【接收】 */
 	public final static String ping = "PING";
 
+	/** 心跳【响应】 */
+	public final static String pong = "PONG";
+
 	/** 司机上传位置 */
 	public static final String location = "LOCATION";
 
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
index 228c11d..a0dd469 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -26,7 +26,7 @@
 	 */
 	public static ChannelHandlerContext getData(String key) {
 		if(map==null){
-			map = new ConcurrentHashMap<String, ChannelHandlerContext>();
+			map = new HashMap<String, ChannelHandlerContext>();
 		}
 		return map.get(key);
 	}
@@ -78,6 +78,9 @@
 	 */
 	@SuppressWarnings("rawtypes")
 	public static synchronized void remove(ChannelHandlerContext value) {
+		if(null == value){
+			return;
+		}
 		Set<String> strings = map.keySet();
 		for(String key : strings){
 			ChannelHandlerContext channelHandlerContext = map.get(key);
@@ -91,6 +94,23 @@
 	}
 
 
+	public static synchronized void remove_(ChannelHandlerContext value) {
+		Set<String> strings = ctxMap.keySet();
+		for(String key : strings){
+			ChannelHandlerContext channelHandlerContext = ctxMap.get(key);
+			String s = channelHandlerContext.channel().remoteAddress().toString();
+			String s1 = value.channel().remoteAddress().toString();
+			if(s.equals(s1)){
+				channelHandlerContext.close();//关闭通道
+				ctxMap.remove(key);
+			}
+		}
+	}
+
+
+	public static synchronized void remove_(String key) {
+		ctxMap.remove(key);
+	}
 
 	
 	/**
@@ -113,4 +133,9 @@
 		map.put(key, value);
 	}
 
+
+
+	public static synchronized void update_(String key, ChannelHandlerContext value) {
+		ctxMap.put(key, value);
+	}
 }
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java
index 40206a7..94b0a31 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java
@@ -36,7 +36,7 @@
 			public void run() {
 				thread.start();
 			}
-		}, 10000);
+		}, 3999);
 	}
 
 	/**
@@ -61,7 +61,7 @@
 				bootstrap.channel(NioServerSocketChannel.class);
 				bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
 				// 通过TCP_NODELAY禁用NAGLE,使消息立即发出去,不用等待到一定的数据量才发出去
-				bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+				bootstrap.option(ChannelOption.TCP_NODELAY, true);
 				// 保持长连接状态
 				bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
 				bootstrap.childHandler(new ServerInit() {
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index c4ae7ad..0f10527 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,16 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
-import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.Resource;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -28,20 +28,17 @@
  * @createDate 2016年6月3日
  * @version 1.0
  */
-@Component
 public class NettyServerController {
 	
 	public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
 
 	public static Hashtable<String,String> table;
-	
-	@Resource
-	private RedisTemplate<String, Object> redisTemplate;
-	
 
+	private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+
+
+	
 	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-
-
 
 
 	static{
@@ -61,7 +58,7 @@
 	 * @param msg
 	 * @author TaoNingBo
 	 */
-	public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
+	public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
         try {
         	// ByteBuf转String
         	ByteBuf byteBuf = (ByteBuf) msg;
@@ -99,105 +96,87 @@
 				String device = jsonCon.getString("device");
 				String version = jsonCon.getString("version");
                 if(StringUtil.isNotEmpty(userId1)){
-					String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
-					if(!StringUtils.hasLength(fluid_control)){
-						redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
-					}else{
-						long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
-						if(l >= 10000){
-							redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
-						}else{
-							String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-							ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-							return;
-						}
-					}
 
-					//判断用户或者司机长连接
+                    //判断用户或者司机长连接
                     if(type==1){
-	                    //存储通讯通道
-	                    if(null != ctx && ctx.channel().isActive()){
-		                    System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
-		                    NettyChannelMap.update("USER" + userId1, ctx);
-		                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-		                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-	                    }
                         //确保账号在单个设备上登录
                         if(StringUtil.isNotEmpty(token)){
-                            String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
+                            String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据
                             if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
-								ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
+								ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
 								JSONObject msg_ = new JSONObject();
                                 msg_.put("code", 200);
                                 msg_.put("msg", "SUCCESS");
                                 msg_.put("method", "OFFLINE");
                                 msg_.put("data", new Object());
-								this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
-                                TimerTask timerTask = new TimerTask() {
+								boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+								if(b){
+									NettyChannelMap.remove_(data_);
+								}
+								new Timer().schedule(new TimerTask() {
                                     @Override
                                     public void run() {
-										NettyChannelMap.remove(context);
+                                        NettyChannelMap.remove_(data_);
                                     }
-                                };
-                                Timer timer = new Timer();
-                                timer.schedule(timerTask, 3000);
-                                timer.cancel();
+                                }, 5000);
                             }
-                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                                redisTemplate.opsForValue().set("USER_" + userId1, token);
-                            }
+                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+                            NettyChannelMap.update("USER" + userId1, ctx);
+							redisUtil.setStrValue("USER_APP_" + userId1, token);
                         }
+
+						//存储通讯通道
+						if(null != ctx && ctx.channel().isActive()){
+							NettyChannelMap.update("USER" + userId1, ctx);
+						}
                     }else{
-	                    //存储通讯通道
-	                    if(null != ctx && ctx.channel().isActive()){
-		                    System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
-		                    NettyChannelMap.update("DRIVER" + userId1, ctx);
-		                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-		                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-	                    }
                         //确保账号在单个设备上登录
-						String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
-						if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
-                            String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
+						if(StringUtil.isNotEmpty(token)){//APP端登录的操作
+                            String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
                             if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
-								ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
-								JSONObject msg_ = new JSONObject();
-                                msg_.put("code", 200);
-                                msg_.put("msg", "SUCCESS");
-                                msg_.put("method", "OFFLINE");
-                                msg_.put("data", new Object());
-								this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
-                                TimerTask timerTask = new TimerTask() {
-                                    @Override
-                                    public void run() {
-										NettyChannelMap.remove(context);
-                                    }
-                                };
-                                Timer timer = new Timer();
-                                timer.schedule(timerTask, 3000);
-                                timer.cancel();
+                                ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+                                if(null != data_){
+									JSONObject msg_ = new JSONObject();
+									msg_.put("code", 200);
+									msg_.put("msg", "SUCCESS");
+									msg_.put("method", "OFFLINE");
+									msg_.put("data", new Object());
+									boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
+									if(b){
+										NettyChannelMap.remove_(data_);
+									}
+								}
                             }
-                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
-                            }
+                            NettyChannelMap.update("DRIVER" + userId1, ctx);
+                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
+							redisUtil.setStrValue("DRIVER_" + userId1, token);
+                        }
+						//存储通讯通道
+                        if(null != ctx && ctx.channel().isActive()){
+                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                         }
                     }
                 }
+
+				if(null != ctx && ctx.channel().isActive()){
+					jsonMsg.put("method", Method.pong);
+					sendMsgToClient(ctx, jsonMsg.toJSONString());
+				}
 			}
 			//司机上传位置
 			if(method.equals(Method.location)){
 				Integer driverId = jsonCon.getInteger("driverId");
-				String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
+				String fluid_control = redisUtil.getValue("location_" + driverId);
 				if(!StringUtils.hasLength(fluid_control)){
-					redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+					redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
 				}else{
 					long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
 					if(l < 5000){
 						return;
 					}
-					redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+					redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
 				}
-
+				
 				Integer orderId = jsonCon.getInteger("orderId");
 				Integer orderType = jsonCon.getInteger("orderType");
 				Double lon = jsonCon.getDouble("lon");
@@ -228,7 +207,7 @@
 								System.err.println("调用driver-server存储位置数据出错了");
 							}
 						}
-                        redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中
+						redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中
 					}else{
 						this.sendMsgToClient(ctx, "__error__" + msg.toString());
 					}
@@ -239,7 +218,7 @@
 			
 		} catch (Exception e) {
 			if(isdebug) {
-				this.sendMsgToClient(ctx, "__error__" + msg.toString());
+				NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
 			}
 			e.printStackTrace();
 		}
@@ -252,7 +231,7 @@
 	 * @param msg
 	 * @author TaoNingBo
 	 */
-	public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
+	public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) {
 		if (ctx != null && ctx.channel().isActive()) {
 			ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
 			ChannelFuture sync;
@@ -272,7 +251,9 @@
 					if(b){
 						NettyChannelMap.remove(ctx);
 					}
+					return true;
 				}
+				return sync.isSuccess();
 			} catch (Exception e) {
 				System.err.println("推送发生异常,记录:"+msg);
 				NettyChannelMap.remove(ctx);
@@ -284,6 +265,7 @@
 			System.err.println("推送失败,长连接不存在");
 			NettyChannelMap.remove(ctx);
 		}
+		return false;
 	}
 
 //	**链接断开 将推送消息记录

--
Gitblit v1.7.1