From bc2cb25734bcf18474ed41a35ac4d54fd976f523 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期一, 11 八月 2025 19:37:18 +0800
Subject: [PATCH] 合并代码

---
 MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java |  211 ++++++++++++++++++++++++++--------------------------
 1 files changed, 105 insertions(+), 106 deletions(-)

diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
index dddbca5..f46c696 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,43 +2,36 @@
 
 
 import com.alibaba.fastjson.JSONObject;
+import com.sinata.push.util.RedisUtil;
 import com.sinata.push.util.SinataUtil;
 import com.sinata.push.util.SpringUtil;
 import com.sinata.push.util.StringUtil;
 import com.sinata.push.util.echo.Method;
 import com.sinata.push.util.echo.NettyChannelMap;
-import com.sinata.push.util.echo.NettyMsg;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Timer;
 import java.util.TimerTask;
 
-@Component
 public class NettyWebSocketController {
 
-    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
-    
-    @Resource
-    private RedisTemplate<String, Object> redisTemplate;
+    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
 
-	public static Hashtable<String,String> table;
-	static{
-		if(table == null){
-			table = new Hashtable<>();
-		}
-	}
-	
+    private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+
+    public static Hashtable<String, String> table;
+
+    static {
+        if (table == null) {
+            table = new Hashtable<>();
+        }
+    }
+
     public static boolean isdebug = false;
     public static int i = 0;
 
@@ -50,78 +43,82 @@
      * @param msg
      * @author TaoNingBo
      */
-    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
+    public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) {
         try {
-        // 验证即时通讯命令是否正确有效
-        if(SinataUtil.isEmpty(msg)) {
-            return;
-        }
-        String msgStr = msg.toString();
-        if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
-            return;
-        }
-        if(isdebug) {
+            // 验证即时通讯命令是否正确有效
+            if (SinataUtil.isEmpty(msg)) {
+                return;
+            }
+            String msgStr = msg.toString();
+            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
+                return;
+            }
+            if (isdebug) {
 //            System.out.println("<<<--receive-->>>111" + msg);
-        }
+            }
 
-        // 获取socket信息,保存相应的socket
-        JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
-        int code = jsonMsg.getIntValue("code");
-        String message = jsonMsg.getString("msg");
-        String method = jsonMsg.getString("method");
-        if(code != 200 || !message.equals("SUCCESS")) {
-            return;
-        }
-        JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
-        // ############################### 心跳  ############################
-        // 心跳
-        if(method.equals(Method.ping)){
-            String token = jsonCon.getString("token");
-            String userId1 = jsonCon.getString("userId");
-            if(StringUtil.isNotEmpty(userId1)){
-                //确保账号在单个设备上登录
-                if(StringUtil.isNotEmpty(token)){
-                    String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
-                    if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
-                        ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1);
-                        JSONObject msg_ = new JSONObject();
-                        msg_.put("code", 200);
-                        msg_.put("msg", "SUCCESS");
-                        msg_.put("method", "OFFLINE");
-                        msg_.put("data", new Object());
-                        this.sendMsgToClient(context, msg_.toJSONString());
-                        TimerTask timerTask = new TimerTask() {
-                            @Override
-                            public void run() {
-                                NettyChannelMap.remove(context);
-                            }
-                        };
-                        Timer timer = new Timer();
-                        timer.schedule(timerTask, 3000);
-                        timer.cancel();
-                    }
-                    if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                        redisTemplate.opsForValue().set("USER_" + userId1, token);
-                    }
-                }
+            // 获取socket信息,保存相应的socket
+            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
+            int code = jsonMsg.getIntValue("code");
+            String message = jsonMsg.getString("msg");
+            String method = jsonMsg.getString("method");
+            if (code != 200 || !message.equals("SUCCESS")) {
+                return;
+            }
+            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
 
-                //存储业务使用的通道
-                if(null != ctx && ctx.channel().isActive()) {
-                    NettyChannelMap.update("Applets" + userId1, ctx);
-                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-                }
+            if (null != ctx && ctx.channel().isActive()) {
+                jsonMsg.put("method", Method.pong);
+                sendMsgToClient(ctx, jsonMsg.toJSONString());
             }
 
 
+            // ############################### 心跳  ############################
+            // 心跳
+            if (method.equals(Method.ping)) {
+                String token = jsonCon.getString("token");
+                String userId1 = jsonCon.getString("userId");
+                String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
+                String business = "1".equals(businessType) ? "dache" : "daijia";
+                if (StringUtil.isNotEmpty(userId1)) {
+                    //确保账号在单个设备上登录
+                    if (StringUtil.isNotEmpty(token)) {
+                        String token_ = redisUtil.getValue(business + ":USER_Applets_" + userId1);//获取缓存中最新的数据
+                        if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
+                            ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+                            JSONObject msg_ = new JSONObject();
+                            msg_.put("code", 200);
+                            msg_.put("msg", "SUCCESS");
+                            msg_.put("method", "OFFLINE");
+                            msg_.put("data", new Object());
+                            this.sendMsgToClient(data_, msg_.toJSONString());
+                            new Timer().schedule(new TimerTask() {
+                                @Override
+                                public void run() {
+                                    NettyChannelMap.remove_(data_);
+                                }
+                            }, 5000);
+                        }
+                        NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道
+                        NettyChannelMap.update(business + ":Applets" + userId1, ctx);
+                        redisUtil.setStrValue(business + ":USER_Applets_" + userId1, token);
+                    }
+
+                    //存储业务使用的通道
+                    if (null != ctx && ctx.channel().isActive()) {
+                        NettyChannelMap.update(business + ":Applets" + userId1, ctx);
+                    }
+                }
+
+
+            }
+        } catch (Exception e) {
+            if (isdebug) {
+                NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
+            }
+            e.printStackTrace();
         }
-    } catch (Exception e) {
-        if(isdebug) {
-        	this.sendMsgToClient(ctx, "__error__" + msg.toString());
-        }
-        e.printStackTrace();
     }
-}
 
     /**
      * 向客户端发送消息
@@ -137,67 +134,69 @@
             ChannelFuture sync;
             try {
                 sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
-                if(!sync.isSuccess()){
+                if (!sync.isSuccess()) {
                     boolean b = true;
                     for (int i = 0; i < 10; i++) {
                         ctx.wait(3000);
                         sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
-                        if(sync.isSuccess()){
+                        if (sync.isSuccess()) {
                             b = false;
                             break;
                         }
-                        System.err.println("小程序-》推送不成功,将继续推送"+msg);
+                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                     }
-                    if(b){
+                    if (b) {
                         NettyChannelMap.remove(ctx);
                     }
                 }
             } catch (Exception e) {
-                System.err.println("小程序-》推送发生异常,记录:"+msg);
+                System.err.println("小程序-》推送发生异常,记录:" + msg);
                 NettyChannelMap.remove(ctx);
             }
-            if(isdebug) {
-                System.err.println("小程序-》 <<<--send-->>>" + msg) ;
+            if (isdebug) {
+                System.err.println("小程序-》 <<<--send-->>>" + msg);
             }
-        }else{
+        } else {
             System.err.println("小程序-》推送失败,长连接不存在");
             NettyChannelMap.remove(ctx);
         }
     }
 
     //	**链接断开 将推送消息记录
-    public static void sendMsgToClient(String cacheType, Integer id,String msg) {
+    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
         ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
         if (ctx != null) {
             ChannelFuture sync;
             try {
                 sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
-                if(!sync.isSuccess()){
+                if (!sync.isSuccess()) {
                     for (int i = 0; i < 10; i++) {
-                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
-                        if(!sync.isSuccess()){
-                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
-                            System.err.println("推送不成功,将继续推送"+msg);
-                            if(i == 9){
-                                table.put(cacheType+id, msg);
+                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+                        ;
+                        if (!sync.isSuccess()) {
+                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+                            ;
+                            System.err.println("推送不成功,将继续推送" + msg);
+                            if (i == 9) {
+                                table.put(cacheType + id, msg);
                                 ctx.close();
-                                System.err.println("推送发生异常,记录:"+msg);
+                                System.err.println("推送发生异常,记录:" + msg);
                             }
-                        }else{
+                        } else {
                             break;
                         }
                     }
                 }
             } catch (Exception e) {
-                table.put(cacheType+id, msg);
-                System.err.println("推送发生异常,记录:"+msg);
+                table.put(cacheType + id, msg);
+                System.err.println("推送发生异常,记录:" + msg);
             }
-            if(isdebug) {
+            if (isdebug) {
                 System.err.println("<<<--send-->>>" + msg);
             }
-        }else{
-            table.put(cacheType+id, msg);
-            System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
+        } else {
+            table.put(cacheType + id, msg);
+            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
         }
     }
 }

--
Gitblit v1.7.1