Pu Zhibing
2025-08-05 4872bb7719c4ccaaab99438af3d987787c818c2a
提交推送服务
9个文件已修改
1个文件已添加
511 ■■■■ 已修改文件
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java 209 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java 146 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -18,11 +18,6 @@
@RequestMapping("/netty")
public class NettyController {
    
    @Autowired
    private NettyServerController nettyServerController;
    @Autowired
    private NettyWebSocketController nettyWebSocketController;
    /**
@@ -37,12 +32,12 @@
        if(type == 1){//用户端
            ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
            if(null != channel){
                nettyWebSocketController.sendMsgToClient(channel, msg);
                NettyWebSocketController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            channel = NettyChannelMap.getData("USER" + id);
            if(null != channel){
                nettyServerController.sendMsgToClient(channel, msg);
                NettyServerController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            return JSON.toJSONString(ResultUtil.error("推送失败-----用户id=" + id));
@@ -53,7 +48,7 @@
            System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map));
            ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
            if(null != channel){
                nettyServerController.sendMsgToClient(channel, msg);
                NettyServerController.sendMsgToClient(channel, msg);
                return JSON.toJSONString(ResultUtil.success());
            }
            return JSON.toJSONString(ResultUtil.error("推送失败-----司机id=" + id));
MessagePushTravel/src/main/java/com/sinata/push/util/RedisUtil.java
New file
@@ -0,0 +1,51 @@
package com.sinata.push.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
 * Redis工具类
 */
@Component
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /**
     * 向redis中存储字符串没有过期时间
     * @param key
     * @param value
     */
    public void setStrValue(String key, String value){
        redisTemplate.opsForValue().set(key, value);
    }
    /**
     * 以分钟为单位设置存储值(设置过期时间)
     * @param key
     * @param value
     * @param time 秒
     */
    public void setStrValue(String key, String value, int time){
        redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
    }
    /**
     * 从redis中获取值
     * @param key
     * @return
     */
    public String getValue(String key){
        return redisTemplate.opsForValue().get(key);
    }
}
MessagePushTravel/src/main/java/com/sinata/push/util/applets/ChildChannelHandler.java
@@ -4,25 +4,17 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.stream.ChunkedWriteHandler;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.File;
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
//        String path = "/root/server/app/cert/qytzt.cn.jks";
//        String path = "C:\\Program Files\\Apache Software Foundation\\Tomcat 8.5\\cert\\SHA256withRSA_lzhyc.cn.pfx";
//        String path = "/usr/local/server/app/cert/tomcat/scs1680576839056_chaoshengdaijia.com_server.jks";
//        SSLContext sslContext = createSSLContext.createSSLContext("JKS"
//                , path, "bo27xqbr");
//                , path, "Zf3^5v6OoWmOVgeQ");
        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
//        SSLEngine engine = sslContext.createSSLEngine();
//        //SSLEngine 此类允许使用ssl安全套接层协议进行安全通信
//        engine.setUseClientMode(false);
//        socketChannel.pipeline().addLast("ssl", new SslHandler(engine));
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,43 +2,36 @@
import com.alibaba.fastjson.JSONObject;
import com.sinata.push.util.RedisUtil;
import com.sinata.push.util.SinataUtil;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.StringUtil;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Timer;
import java.util.TimerTask;
@Component
public class NettyWebSocketController {
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
    public static Hashtable<String,String> table;
    static{
        if(table == null){
            table = new Hashtable<>();
        }
    }
    private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
    public static Hashtable<String, String> table;
    static {
        if (table == null) {
            table = new Hashtable<>();
        }
    }
    public static boolean isdebug = false;
    public static int i = 0;
@@ -50,78 +43,80 @@
     * @param msg
     * @author TaoNingBo
     */
    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
    public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) {
        try {
        // 验证即时通讯命令是否正确有效
        if(SinataUtil.isEmpty(msg)) {
            return;
        }
        String msgStr = msg.toString();
        if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
            return;
        }
        if(isdebug) {
            // 验证即时通讯命令是否正确有效
            if (SinataUtil.isEmpty(msg)) {
                return;
            }
            String msgStr = msg.toString();
            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
                return;
            }
            if (isdebug) {
//            System.out.println("<<<--receive-->>>111" + msg);
        }
            }
        // 获取socket信息,保存相应的socket
        JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
        int code = jsonMsg.getIntValue("code");
        String message = jsonMsg.getString("msg");
        String method = jsonMsg.getString("method");
        if(code != 200 || !message.equals("SUCCESS")) {
            return;
        }
        JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
        // ############################### 心跳  ############################
        // 心跳
        if(method.equals(Method.ping)){
            String token = jsonCon.getString("token");
            String userId1 = jsonCon.getString("userId");
            if(StringUtil.isNotEmpty(userId1)){
                //确保账号在单个设备上登录
                if(StringUtil.isNotEmpty(token)){
                    String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
                    if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
                        ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1);
                        JSONObject msg_ = new JSONObject();
                        msg_.put("code", 200);
                        msg_.put("msg", "SUCCESS");
                        msg_.put("method", "OFFLINE");
                        msg_.put("data", new Object());
                        this.sendMsgToClient(context, msg_.toJSONString());
                        TimerTask timerTask = new TimerTask() {
                            @Override
                            public void run() {
                                NettyChannelMap.remove(context);
                            }
                        };
                        Timer timer = new Timer();
                        timer.schedule(timerTask, 3000);
                        timer.cancel();
                    }
                    if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                        redisTemplate.opsForValue().set("USER_" + userId1, token);
                    }
                }
            // 获取socket信息,保存相应的socket
            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
            int code = jsonMsg.getIntValue("code");
            String message = jsonMsg.getString("msg");
            String method = jsonMsg.getString("method");
            if (code != 200 || !message.equals("SUCCESS")) {
                return;
            }
            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
                //存储业务使用的通道
                if(null != ctx && ctx.channel().isActive()) {
                    NettyChannelMap.update("Applets" + userId1, ctx);
                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                }
            if (null != ctx && ctx.channel().isActive()) {
                jsonMsg.put("method", Method.pong);
                sendMsgToClient(ctx, jsonMsg.toJSONString());
            }
            // ############################### 心跳  ############################
            // 心跳
            if (method.equals(Method.ping)) {
                String token = jsonCon.getString("token");
                String userId1 = jsonCon.getString("userId");
                if (StringUtil.isNotEmpty(userId1)) {
                    //确保账号在单个设备上登录
                    if (StringUtil.isNotEmpty(token)) {
                        String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据
                        if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
                            ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                            JSONObject msg_ = new JSONObject();
                            msg_.put("code", 200);
                            msg_.put("msg", "SUCCESS");
                            msg_.put("method", "OFFLINE");
                            msg_.put("data", new Object());
                            this.sendMsgToClient(data_, msg_.toJSONString());
                            new Timer().schedule(new TimerTask() {
                                @Override
                                public void run() {
                                    NettyChannelMap.remove_(data_);
                                }
                            }, 5000);
                        }
                        NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道
                        NettyChannelMap.update("Applets" + userId1, ctx);
                        redisUtil.setStrValue("USER_Applets_" + userId1, token);
                    }
                    //存储业务使用的通道
                    if (null != ctx && ctx.channel().isActive()) {
                        NettyChannelMap.update("Applets" + userId1, ctx);
                    }
                }
            }
        } catch (Exception e) {
            if (isdebug) {
                NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
            }
            e.printStackTrace();
        }
    } catch (Exception e) {
        if(isdebug) {
            this.sendMsgToClient(ctx, "__error__" + msg.toString());
        }
        e.printStackTrace();
    }
}
    /**
     * 向客户端发送消息
@@ -137,67 +132,69 @@
            ChannelFuture sync;
            try {
                sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
                if(!sync.isSuccess()){
                if (!sync.isSuccess()) {
                    boolean b = true;
                    for (int i = 0; i < 10; i++) {
                        ctx.wait(3000);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        if(sync.isSuccess()){
                        if (sync.isSuccess()) {
                            b = false;
                            break;
                        }
                        System.err.println("小程序-》推送不成功,将继续推送"+msg);
                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                    }
                    if(b){
                    if (b) {
                        NettyChannelMap.remove(ctx);
                    }
                }
            } catch (Exception e) {
                System.err.println("小程序-》推送发生异常,记录:"+msg);
                System.err.println("小程序-》推送发生异常,记录:" + msg);
                NettyChannelMap.remove(ctx);
            }
            if(isdebug) {
                System.err.println("小程序-》 <<<--send-->>>" + msg) ;
            if (isdebug) {
                System.err.println("小程序-》 <<<--send-->>>" + msg);
            }
        }else{
        } else {
            System.err.println("小程序-》推送失败,长连接不存在");
            NettyChannelMap.remove(ctx);
        }
    }
    //    **链接断开 将推送消息记录
    public static void sendMsgToClient(String cacheType, Integer id,String msg) {
    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
        ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
        if (ctx != null) {
            ChannelFuture sync;
            try {
                sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                if(!sync.isSuccess()){
                if (!sync.isSuccess()) {
                    for (int i = 0; i < 10; i++) {
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
                        if(!sync.isSuccess()){
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
                            System.err.println("推送不成功,将继续推送"+msg);
                            if(i == 9){
                                table.put(cacheType+id, msg);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        ;
                        if (!sync.isSuccess()) {
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                            ;
                            System.err.println("推送不成功,将继续推送" + msg);
                            if (i == 9) {
                                table.put(cacheType + id, msg);
                                ctx.close();
                                System.err.println("推送发生异常,记录:"+msg);
                                System.err.println("推送发生异常,记录:" + msg);
                            }
                        }else{
                        } else {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                table.put(cacheType+id, msg);
                System.err.println("推送发生异常,记录:"+msg);
                table.put(cacheType + id, msg);
                System.err.println("推送发生异常,记录:" + msg);
            }
            if(isdebug) {
            if (isdebug) {
                System.err.println("<<<--send-->>>" + msg);
            }
        }else{
            table.put(cacheType+id, msg);
            System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
        } else {
            table.put(cacheType + id, msg);
            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
        }
    }
}
MessagePushTravel/src/main/java/com/sinata/push/util/applets/WebSocketHandler.java
@@ -1,7 +1,6 @@
package com.sinata.push.util.applets;
import com.alibaba.fastjson.JSONObject;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyMsg;
@@ -16,7 +15,6 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.HashMap;
@@ -26,8 +24,6 @@
    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
//    private NettyWebSocketController nettyWebSocketController = SpringUtil.getObject(NettyWebSocketController.class);
@@ -77,13 +73,8 @@
    }
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        HttpHeaders headers = req.headers();
        headers.names().forEach(name -> {
            System.out.println(name + ":" + headers.get(name));
        });
        // Http解码失败,向服务器指定传输的协议为Upgrade:websocket
        if(!req.getDecoderResult().isSuccess() || !("websocket").equals(req.headers().get("Upgrade"))){
            System.out.println("Http解码失败");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
@@ -92,11 +83,9 @@
        // 根据工厂类和HTTP请求创建握手类
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            System.out.println("不支持websocket");
            // 不支持websocket
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            System.out.println("通过它构造握手响应消息返回给客户端");
            // 通过它构造握手响应消息返回给客户端
            handshaker.handshake(ctx.channel(), req);
        }
@@ -123,21 +112,18 @@
        //给连接的客户端返回数据
        //返回心跳
//        JSONObject jsonObject = new JSONObject();
//        jsonObject.put("code", 200);
//        jsonObject.put("method", Method.ok);
//        jsonObject.put("msg", "SUCCESS");
//        jsonObject.put("data", new JSONObject());
//        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 200);
        jsonObject.put("method", Method.ok);
        jsonObject.put("msg", "SUCCESS");
        jsonObject.put("data", new JSONObject());
        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
//        ctx.channel().writeAndFlush(tws);
        String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
        ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
//        nettyWebSocketController.JudgeOperation(ctx,requestmsg);//小程序心跳处理
        new NettyWebSocketController().JudgeOperation(ctx,requestmsg);//小程序心跳处理
        // 群发服务端心跳响应
//        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
        Global.group.writeAndFlush(new TextWebSocketFrame((tws).text()));
    }
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
MessagePushTravel/src/main/java/com/sinata/push/util/echo/DiscardServerHandler.java
@@ -1,6 +1,5 @@
package com.sinata.push.util.echo;
import com.sinata.push.util.SpringUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@@ -8,14 +7,13 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import org.springframework.data.redis.core.RedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
public class DiscardServerHandler extends SimpleChannelInboundHandler<String>  {
    private NettyServerController nettyServerController = SpringUtil.getObject(NettyServerController.class);
    private NettyServerController nettyServerController = new NettyServerController();
    
    public static boolean isdebug = true;
@@ -23,7 +21,7 @@
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        if(isdebug) {
            System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
//            System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
        }
        try {
            // 调用service
@@ -39,7 +37,7 @@
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        if(isdebug) {
            System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
//            System.err.println(insocket.getAddress() + ": 收到客户端数据.......");
        }
        try {
            // 调用service
@@ -70,7 +68,7 @@
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        if(isdebug) {
            System.err.println(insocket.getAddress() + ": Connect successful......");
//            System.err.println(insocket.getAddress() + ": Connect successful......");
        }
    }
@@ -106,7 +104,7 @@
//            System.err.println(insocket.getAddress() + ": Disconnect connection......");
        }
        NettyChannelMap.remove(ctx);
        System.err.println("清除通道" + ctx);
//        System.err.println("清除通道" + ctx);
//        super.channelInactive(ctx);
    }
MessagePushTravel/src/main/java/com/sinata/push/util/echo/Method.java
@@ -15,6 +15,9 @@
    /** 心跳【接收】 */
    public final static String ping = "PING";
    /** 心跳【响应】 */
    public final static String pong = "PONG";
    /** 司机上传位置 */
    public static final String location = "LOCATION";
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -26,7 +26,7 @@
     */
    public static ChannelHandlerContext getData(String key) {
        if(map==null){
            map = new ConcurrentHashMap<String, ChannelHandlerContext>();
            map = new HashMap<String, ChannelHandlerContext>();
        }
        return map.get(key);
    }
@@ -78,6 +78,9 @@
     */
    @SuppressWarnings("rawtypes")
    public static synchronized void remove(ChannelHandlerContext value) {
        if(null == value){
            return;
        }
        Set<String> strings = map.keySet();
        for(String key : strings){
            ChannelHandlerContext channelHandlerContext = map.get(key);
@@ -91,6 +94,23 @@
    }
    public static synchronized void remove_(ChannelHandlerContext value) {
        Set<String> strings = ctxMap.keySet();
        for(String key : strings){
            ChannelHandlerContext channelHandlerContext = ctxMap.get(key);
            String s = channelHandlerContext.channel().remoteAddress().toString();
            String s1 = value.channel().remoteAddress().toString();
            if(s.equals(s1)){
                channelHandlerContext.close();//关闭通道
                ctxMap.remove(key);
            }
        }
    }
    public static synchronized void remove_(String key) {
        ctxMap.remove(key);
    }
    
    /**
@@ -113,4 +133,9 @@
        map.put(key, value);
    }
    public static synchronized void update_(String key, ChannelHandlerContext value) {
        ctxMap.put(key, value);
    }
}
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServer.java
@@ -36,7 +36,7 @@
            public void run() {
                thread.start();
            }
        }, 10000);
        }, 3999);
    }
    /**
@@ -61,7 +61,7 @@
                bootstrap.channel(NioServerSocketChannel.class);
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                // 通过TCP_NODELAY禁用NAGLE,使消息立即发出去,不用等待到一定的数据量才发出去
                bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
                bootstrap.option(ChannelOption.TCP_NODELAY, true);
                // 保持长连接状态
                bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.childHandler(new ServerInit() {
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -10,16 +10,16 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@@ -28,20 +28,17 @@
 * @createDate 2016年6月3日
 * @version 1.0
 */
@Component
public class NettyServerController {
    
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
    public static Hashtable<String,String> table;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    static{
@@ -61,7 +58,7 @@
     * @param msg
     * @author TaoNingBo
     */
    public void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
    public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) {
        try {
            // ByteBuf转String
            ByteBuf byteBuf = (ByteBuf) msg;
@@ -99,105 +96,87 @@
                String device = jsonCon.getString("device");
                String version = jsonCon.getString("version");
                if(StringUtil.isNotEmpty(userId1)){
                    String fluid_control = (String)redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
                    if(!StringUtils.hasLength(fluid_control)){
                        redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
                    }else{
                        long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
                        if(l >= 10000){
                            redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
                        }else{
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                            return;
                        }
                    }
                    //判断用户或者司机长连接
                    //判断用户或者司机长连接
                    if(type==1){
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("USER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                        //确保账号在单个设备上登录
                        if(StringUtil.isNotEmpty(token)){
                            String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
                            String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据
                            if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
                                ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
                                ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                                JSONObject msg_ = new JSONObject();
                                msg_.put("code", 200);
                                msg_.put("msg", "SUCCESS");
                                msg_.put("method", "OFFLINE");
                                msg_.put("data", new Object());
                                this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                TimerTask timerTask = new TimerTask() {
                                boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
                                if(b){
                                    NettyChannelMap.remove_(data_);
                                }
                                new Timer().schedule(new TimerTask() {
                                    @Override
                                    public void run() {
                                        NettyChannelMap.remove(context);
                                        NettyChannelMap.remove_(data_);
                                    }
                                };
                                Timer timer = new Timer();
                                timer.schedule(timerTask, 3000);
                                timer.cancel();
                                }, 5000);
                            }
                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                                redisTemplate.opsForValue().set("USER_" + userId1, token);
                            }
                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
                            NettyChannelMap.update("USER" + userId1, ctx);
                            redisUtil.setStrValue("USER_APP_" + userId1, token);
                        }
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            NettyChannelMap.update("USER" + userId1, ctx);
                        }
                    }else{
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                        //确保账号在单个设备上登录
                        String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
                        if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
                            String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
                        if(StringUtil.isNotEmpty(token)){//APP端登录的操作
                            String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
                            if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
                                ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
                                JSONObject msg_ = new JSONObject();
                                msg_.put("code", 200);
                                msg_.put("msg", "SUCCESS");
                                msg_.put("method", "OFFLINE");
                                msg_.put("data", new Object());
                                this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                TimerTask timerTask = new TimerTask() {
                                    @Override
                                    public void run() {
                                        NettyChannelMap.remove(context);
                                    }
                                };
                                Timer timer = new Timer();
                                timer.schedule(timerTask, 3000);
                                timer.cancel();
                                ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                                if(null != data_){
                                    JSONObject msg_ = new JSONObject();
                                    msg_.put("code", 200);
                                    msg_.put("msg", "SUCCESS");
                                    msg_.put("method", "OFFLINE");
                                    msg_.put("data", new Object());
                                    boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息
                                    if(b){
                                        NettyChannelMap.remove_(data_);
                                    }
                                }
                            }
                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                            }
                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                            NettyChannelMap.update_(token.substring(token.length() - 16), ctx);
                            redisUtil.setStrValue("DRIVER_" + userId1, token);
                        }
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                        }
                    }
                }
                if(null != ctx && ctx.channel().isActive()){
                    jsonMsg.put("method", Method.pong);
                    sendMsgToClient(ctx, jsonMsg.toJSONString());
                }
            }
            //司机上传位置
            if(method.equals(Method.location)){
                Integer driverId = jsonCon.getInteger("driverId");
                String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
                String fluid_control = redisUtil.getValue("location_" + driverId);
                if(!StringUtils.hasLength(fluid_control)){
                    redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
                    redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
                }else{
                    long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
                    if(l < 5000){
                        return;
                    }
                    redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
                    redisUtil.setStrValue("location_" + driverId, System.currentTimeMillis() + "");
                }
                Integer orderId = jsonCon.getInteger("orderId");
                Integer orderType = jsonCon.getInteger("orderType");
                Double lon = jsonCon.getDouble("lon");
@@ -228,7 +207,7 @@
                                System.err.println("调用driver-server存储位置数据出错了");
                            }
                        }
                        redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 30, TimeUnit.SECONDS);//实时位置存入redis中
                        redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中
                    }else{
                        this.sendMsgToClient(ctx, "__error__" + msg.toString());
                    }
@@ -239,7 +218,7 @@
            
        } catch (Exception e) {
            if(isdebug) {
                this.sendMsgToClient(ctx, "__error__" + msg.toString());
                NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
            }
            e.printStackTrace();
        }
@@ -252,7 +231,7 @@
     * @param msg
     * @author TaoNingBo
     */
    public void sendMsgToClient(ChannelHandlerContext ctx, String msg) {
    public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) {
        if (ctx != null && ctx.channel().isActive()) {
            ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes());
            ChannelFuture sync;
@@ -272,7 +251,9 @@
                    if(b){
                        NettyChannelMap.remove(ctx);
                    }
                    return true;
                }
                return sync.isSuccess();
            } catch (Exception e) {
                System.err.println("推送发生异常,记录:"+msg);
                NettyChannelMap.remove(ctx);
@@ -284,6 +265,7 @@
            System.err.println("推送失败,长连接不存在");
            NettyChannelMap.remove(ctx);
        }
        return false;
    }
//    **链接断开 将推送消息记录