Pu Zhibing
2025-08-05 4872bb7719c4ccaaab99438af3d987787c818c2a
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,43 +2,36 @@
import com.alibaba.fastjson.JSONObject;
import com.sinata.push.util.RedisUtil;
import com.sinata.push.util.SinataUtil;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.StringUtil;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Timer;
import java.util.TimerTask;
@Component
public class NettyWebSocketController {
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
   public static Hashtable<String,String> table;
   static{
      if(table == null){
         table = new Hashtable<>();
      }
   }
    private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
    public static Hashtable<String, String> table;
    static {
        if (table == null) {
            table = new Hashtable<>();
        }
    }
    public static boolean isdebug = false;
    public static int i = 0;
@@ -50,78 +43,80 @@
     * @param msg
     * @author TaoNingBo
     */
    public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
    public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) {
        try {
        // 验证即时通讯命令是否正确有效
        if(SinataUtil.isEmpty(msg)) {
            return;
        }
        String msgStr = msg.toString();
        if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
            return;
        }
        if(isdebug) {
            // 验证即时通讯命令是否正确有效
            if (SinataUtil.isEmpty(msg)) {
                return;
            }
            String msgStr = msg.toString();
            if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
                return;
            }
            if (isdebug) {
//            System.out.println("<<<--receive-->>>111" + msg);
        }
            }
        // 获取socket信息,保存相应的socket
        JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
        int code = jsonMsg.getIntValue("code");
        String message = jsonMsg.getString("msg");
        String method = jsonMsg.getString("method");
        if(code != 200 || !message.equals("SUCCESS")) {
            return;
        }
        JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
        // ############################### 心跳  ############################
        // 心跳
        if(method.equals(Method.ping)){
            String token = jsonCon.getString("token");
            String userId1 = jsonCon.getString("userId");
            if(StringUtil.isNotEmpty(userId1)){
                //确保账号在单个设备上登录
                if(StringUtil.isNotEmpty(token)){
                    String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
                    if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
                        ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1);
                        JSONObject msg_ = new JSONObject();
                        msg_.put("code", 200);
                        msg_.put("msg", "SUCCESS");
                        msg_.put("method", "OFFLINE");
                        msg_.put("data", new Object());
                        this.sendMsgToClient(context, msg_.toJSONString());
                        TimerTask timerTask = new TimerTask() {
                            @Override
                            public void run() {
                                NettyChannelMap.remove(context);
                            }
                        };
                        Timer timer = new Timer();
                        timer.schedule(timerTask, 3000);
                        timer.cancel();
                    }
                    if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                        redisTemplate.opsForValue().set("USER_" + userId1, token);
                    }
                }
            // 获取socket信息,保存相应的socket
            JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
            int code = jsonMsg.getIntValue("code");
            String message = jsonMsg.getString("msg");
            String method = jsonMsg.getString("method");
            if (code != 200 || !message.equals("SUCCESS")) {
                return;
            }
            JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
                //存储业务使用的通道
                if(null != ctx && ctx.channel().isActive()) {
                    NettyChannelMap.update("Applets" + userId1, ctx);
                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                }
            if (null != ctx && ctx.channel().isActive()) {
                jsonMsg.put("method", Method.pong);
                sendMsgToClient(ctx, jsonMsg.toJSONString());
            }
            // ############################### 心跳  ############################
            // 心跳
            if (method.equals(Method.ping)) {
                String token = jsonCon.getString("token");
                String userId1 = jsonCon.getString("userId");
                if (StringUtil.isNotEmpty(userId1)) {
                    //确保账号在单个设备上登录
                    if (StringUtil.isNotEmpty(token)) {
                        String token_ = redisUtil.getValue("USER_Applets_" + userId1);//获取缓存中最新的数据
                        if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
                            ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
                            JSONObject msg_ = new JSONObject();
                            msg_.put("code", 200);
                            msg_.put("msg", "SUCCESS");
                            msg_.put("method", "OFFLINE");
                            msg_.put("data", new Object());
                            this.sendMsgToClient(data_, msg_.toJSONString());
                            new Timer().schedule(new TimerTask() {
                                @Override
                                public void run() {
                                    NettyChannelMap.remove_(data_);
                                }
                            }, 5000);
                        }
                        NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道
                        NettyChannelMap.update("Applets" + userId1, ctx);
                        redisUtil.setStrValue("USER_Applets_" + userId1, token);
                    }
                    //存储业务使用的通道
                    if (null != ctx && ctx.channel().isActive()) {
                        NettyChannelMap.update("Applets" + userId1, ctx);
                    }
                }
            }
        } catch (Exception e) {
            if (isdebug) {
                NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
            }
            e.printStackTrace();
        }
    } catch (Exception e) {
        if(isdebug) {
           this.sendMsgToClient(ctx, "__error__" + msg.toString());
        }
        e.printStackTrace();
    }
}
    /**
     * 向客户端发送消息
@@ -137,67 +132,69 @@
            ChannelFuture sync;
            try {
                sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
                if(!sync.isSuccess()){
                if (!sync.isSuccess()) {
                    boolean b = true;
                    for (int i = 0; i < 10; i++) {
                        ctx.wait(3000);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        if(sync.isSuccess()){
                        if (sync.isSuccess()) {
                            b = false;
                            break;
                        }
                        System.err.println("小程序-》推送不成功,将继续推送"+msg);
                        System.err.println("小程序-》推送不成功,将继续推送" + msg);
                    }
                    if(b){
                    if (b) {
                        NettyChannelMap.remove(ctx);
                    }
                }
            } catch (Exception e) {
                System.err.println("小程序-》推送发生异常,记录:"+msg);
                System.err.println("小程序-》推送发生异常,记录:" + msg);
                NettyChannelMap.remove(ctx);
            }
            if(isdebug) {
                System.err.println("小程序-》 <<<--send-->>>" + msg) ;
            if (isdebug) {
                System.err.println("小程序-》 <<<--send-->>>" + msg);
            }
        }else{
        } else {
            System.err.println("小程序-》推送失败,长连接不存在");
            NettyChannelMap.remove(ctx);
        }
    }
    //   **链接断开 将推送消息记录
    public static void sendMsgToClient(String cacheType, Integer id,String msg) {
    public static void sendMsgToClient(String cacheType, Integer id, String msg) {
        ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
        if (ctx != null) {
            ChannelFuture sync;
            try {
                sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                if(!sync.isSuccess()){
                if (!sync.isSuccess()) {
                    for (int i = 0; i < 10; i++) {
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
                        if(!sync.isSuccess()){
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
                            System.err.println("推送不成功,将继续推送"+msg);
                            if(i == 9){
                                table.put(cacheType+id, msg);
                        sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                        ;
                        if (!sync.isSuccess()) {
                            sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
                            ;
                            System.err.println("推送不成功,将继续推送" + msg);
                            if (i == 9) {
                                table.put(cacheType + id, msg);
                                ctx.close();
                                System.err.println("推送发生异常,记录:"+msg);
                                System.err.println("推送发生异常,记录:" + msg);
                            }
                        }else{
                        } else {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                table.put(cacheType+id, msg);
                System.err.println("推送发生异常,记录:"+msg);
                table.put(cacheType + id, msg);
                System.err.println("推送发生异常,记录:" + msg);
            }
            if(isdebug) {
            if (isdebug) {
                System.err.println("<<<--send-->>>" + msg);
            }
        }else{
            table.put(cacheType+id, msg);
            System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
        } else {
            table.put(cacheType + id, msg);
            System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
        }
    }
}