From bc2cb25734bcf18474ed41a35ac4d54fd976f523 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期一, 11 八月 2025 19:37:18 +0800 Subject: [PATCH] 合并代码 --- MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java | 211 ++++++++++++++++++++++++++-------------------------- 1 files changed, 105 insertions(+), 106 deletions(-) diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java index dddbca5..f46c696 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java @@ -2,43 +2,36 @@ import com.alibaba.fastjson.JSONObject; +import com.sinata.push.util.RedisUtil; import com.sinata.push.util.SinataUtil; import com.sinata.push.util.SpringUtil; import com.sinata.push.util.StringUtil; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; -import com.sinata.push.util.echo.NettyMsg; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.HashMap; import java.util.Hashtable; import java.util.Timer; import java.util.TimerTask; -@Component public class NettyWebSocketController { - public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>(); - - @Resource - private RedisTemplate<String, Object> redisTemplate; + public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>(); - public static Hashtable<String,String> table; - static{ - if(table == null){ - table = new Hashtable<>(); - } - } - + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + public static Hashtable<String, String> table; + + static { + if (table == null) { + table = new Hashtable<>(); + } + } + public static boolean isdebug = false; public static int i = 0; @@ -50,78 +43,82 @@ * @param msg * @author TaoNingBo */ - public void JudgeOperation(ChannelHandlerContext ctx, String msg) { + public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) { try { - // 验证即时通讯命令是否正确有效 - if(SinataUtil.isEmpty(msg)) { - return; - } - String msgStr = msg.toString(); - if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { - return; - } - if(isdebug) { + // 验证即时通讯命令是否正确有效 + if (SinataUtil.isEmpty(msg)) { + return; + } + String msgStr = msg.toString(); + if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) { + return; + } + if (isdebug) { // System.out.println("<<<--receive-->>>111" + msg); - } + } - // 获取socket信息,保存相应的socket - JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); - int code = jsonMsg.getIntValue("code"); - String message = jsonMsg.getString("msg"); - String method = jsonMsg.getString("method"); - if(code != 200 || !message.equals("SUCCESS")) { - return; - } - JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); - // ############################### 心跳 ############################ - // 心跳 - if(method.equals(Method.ping)){ - String token = jsonCon.getString("token"); - String userId1 = jsonCon.getString("userId"); - if(StringUtil.isNotEmpty(userId1)){ - //确保账号在单个设备上登录 - if(StringUtil.isNotEmpty(token)){ - String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 - if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1); - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString()); - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(context); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); - } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("USER_" + userId1, token); - } - } + // 获取socket信息,保存相应的socket + JSONObject jsonMsg = JSONObject.parseObject(msg.toString()); + int code = jsonMsg.getIntValue("code"); + String message = jsonMsg.getString("msg"); + String method = jsonMsg.getString("method"); + if (code != 200 || !message.equals("SUCCESS")) { + return; + } + JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); - //存储业务使用的通道 - if(null != ctx && ctx.channel().isActive()) { - NettyChannelMap.update("Applets" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } + if (null != ctx && ctx.channel().isActive()) { + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); } + // ############################### 心跳 ############################ + // 心跳 + if (method.equals(Method.ping)) { + String token = jsonCon.getString("token"); + String userId1 = jsonCon.getString("userId"); + String businessType = jsonCon.getString("businessType");//1:打车,2=代驾 + String business = "1".equals(businessType) ? "dache" : "daijia"; + if (StringUtil.isNotEmpty(userId1)) { + //确保账号在单个设备上登录 + if (StringUtil.isNotEmpty(token)) { + String token_ = redisUtil.getValue(business + ":USER_Applets_" + userId1);//获取缓存中最新的数据 + if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据 + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + this.sendMsgToClient(data_, msg_.toJSONString()); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + NettyChannelMap.remove_(data_); + } + }, 5000); + } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道 + NettyChannelMap.update(business + ":Applets" + userId1, ctx); + redisUtil.setStrValue(business + ":USER_Applets_" + userId1, token); + } + + //存储业务使用的通道 + if (null != ctx && ctx.channel().isActive()) { + NettyChannelMap.update(business + ":Applets" + userId1, ctx); + } + } + + + } + } catch (Exception e) { + if (isdebug) { + NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString()); + } + e.printStackTrace(); } - } catch (Exception e) { - if(isdebug) { - this.sendMsgToClient(ctx, "__error__" + msg.toString()); - } - e.printStackTrace(); } -} /** * 向客户端发送消息 @@ -137,67 +134,69 @@ ChannelFuture sync; try { sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync(); - if(!sync.isSuccess()){ + if (!sync.isSuccess()) { boolean b = true; for (int i = 0; i < 10; i++) { ctx.wait(3000); sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); - if(sync.isSuccess()){ + if (sync.isSuccess()) { b = false; break; } - System.err.println("小程序-》推送不成功,将继续推送"+msg); + System.err.println("小程序-》推送不成功,将继续推送" + msg); } - if(b){ + if (b) { NettyChannelMap.remove(ctx); } } } catch (Exception e) { - System.err.println("小程序-》推送发生异常,记录:"+msg); + System.err.println("小程序-》推送发生异常,记录:" + msg); NettyChannelMap.remove(ctx); } - if(isdebug) { - System.err.println("小程序-》 <<<--send-->>>" + msg) ; + if (isdebug) { + System.err.println("小程序-》 <<<--send-->>>" + msg); } - }else{ + } else { System.err.println("小程序-》推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } } // **链接断开 将推送消息记录 - public static void sendMsgToClient(String cacheType, Integer id,String msg) { + public static void sendMsgToClient(String cacheType, Integer id, String msg) { ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id); if (ctx != null) { ChannelFuture sync; try { sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); - if(!sync.isSuccess()){ + if (!sync.isSuccess()) { for (int i = 0; i < 10; i++) { - sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; - if(!sync.isSuccess()){ - sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();; - System.err.println("推送不成功,将继续推送"+msg); - if(i == 9){ - table.put(cacheType+id, msg); + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + if (!sync.isSuccess()) { + sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync(); + ; + System.err.println("推送不成功,将继续推送" + msg); + if (i == 9) { + table.put(cacheType + id, msg); ctx.close(); - System.err.println("推送发生异常,记录:"+msg); + System.err.println("推送发生异常,记录:" + msg); } - }else{ + } else { break; } } } } catch (Exception e) { - table.put(cacheType+id, msg); - System.err.println("推送发生异常,记录:"+msg); + table.put(cacheType + id, msg); + System.err.println("推送发生异常,记录:" + msg); } - if(isdebug) { + if (isdebug) { System.err.println("<<<--send-->>>" + msg); } - }else{ - table.put(cacheType+id, msg); - System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg); + } else { + table.put(cacheType + id, msg); + System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg); } } } -- Gitblit v1.7.1