From 1c40baaf9ca0183945b9881d11ceed5aeebc8290 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期四, 23 十月 2025 11:35:44 +0800
Subject: [PATCH] 修改bug
---
MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java | 204 ++++++++++++++++++++++++++-------------------------
1 files changed, 104 insertions(+), 100 deletions(-)
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
index da05e85..0513409 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/applets/NettyWebSocketController.java
@@ -2,38 +2,36 @@
import com.alibaba.fastjson.JSONObject;
+import com.sinata.push.util.RedisUtil;
import com.sinata.push.util.SinataUtil;
import com.sinata.push.util.SpringUtil;
import com.sinata.push.util.StringUtil;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
-import com.sinata.push.util.echo.NettyMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Timer;
import java.util.TimerTask;
public class NettyWebSocketController {
- public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext,String>>();
+ public static Hashtable<String, Hashtable<ChannelHandlerContext, String>> map = new Hashtable<String, Hashtable<ChannelHandlerContext, String>>();
- private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class);
+ private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
- public static Hashtable<String,String> table;
- static{
- if(table == null){
- table = new Hashtable<>();
- }
- }
-
+ public static Hashtable<String, String> table;
+
+ static {
+ if (table == null) {
+ table = new Hashtable<>();
+ }
+ }
+
public static boolean isdebug = false;
public static int i = 0;
@@ -45,78 +43,82 @@
* @param msg
* @author TaoNingBo
*/
- public void JudgeOperation(ChannelHandlerContext ctx, String msg) {
+ public synchronized void JudgeOperation(ChannelHandlerContext ctx, String msg) {
try {
- // 验证即时通讯命令是否正确有效
- if(SinataUtil.isEmpty(msg)) {
- return;
- }
- String msgStr = msg.toString();
- if(msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
- return;
- }
- if(isdebug) {
+ // 验证即时通讯命令是否正确有效
+ if (SinataUtil.isEmpty(msg)) {
+ return;
+ }
+ String msgStr = msg.toString();
+ if (msgStr.indexOf("{") == -1 || msgStr.indexOf("}") == -1 || msgStr.indexOf("code") == -1 || msgStr.indexOf("msg") == -1 || msgStr.indexOf("data") == -1 || msgStr.indexOf("method") == -1) {
+ return;
+ }
+ if (isdebug) {
// System.out.println("<<<--receive-->>>111" + msg);
- }
+ }
- // 获取socket信息,保存相应的socket
- JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
- int code = jsonMsg.getIntValue("code");
- String message = jsonMsg.getString("msg");
- String method = jsonMsg.getString("method");
- if(code != 200 || !message.equals("SUCCESS")) {
- return;
- }
- JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
- // ############################### 心跳 ############################
- // 心跳
- if(method.equals(Method.ping)){
- String token = jsonCon.getString("token");
- String userId1 = jsonCon.getString("userId");
- if(StringUtil.isNotEmpty(userId1)){
- //确保账号在单个设备上登录
- if(StringUtil.isNotEmpty(token)){
- String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
- if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
- ChannelHandlerContext context = NettyChannelMap.getData("Applets" + userId1);
- JSONObject msg_ = new JSONObject();
- msg_.put("code", 200);
- msg_.put("msg", "SUCCESS");
- msg_.put("method", "OFFLINE");
- msg_.put("data", new Object());
- this.sendMsgToClient(context, msg_.toJSONString());
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- NettyChannelMap.remove(context);
- }
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
- }
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("USER_" + userId1, token);
- }
- }
+ // 获取socket信息,保存相应的socket
+ JSONObject jsonMsg = JSONObject.parseObject(msg.toString());
+ int code = jsonMsg.getIntValue("code");
+ String message = jsonMsg.getString("msg");
+ String method = jsonMsg.getString("method");
+ if (code != 200 || !message.equals("SUCCESS")) {
+ return;
+ }
+ JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
- //存储业务使用的通道
- if(null != ctx && ctx.channel().isActive()) {
- NettyChannelMap.update("Applets" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
+ if (null != ctx && ctx.channel().isActive()) {
+ jsonMsg.put("method", Method.pong);
+ sendMsgToClient(ctx, jsonMsg.toJSONString());
}
+ // ############################### 心跳 ############################
+ // 心跳
+ if (method.equals(Method.ping)) {
+ String token = jsonCon.getString("token");
+ String userId1 = jsonCon.getString("userId");
+ String businessType = jsonCon.getString("businessType");//1:打车,2=代驾
+ String business = "2".equals(businessType) ? "daijia" : "dache";
+ if (StringUtil.isNotEmpty(userId1)) {
+ //确保账号在单个设备上登录
+ if (StringUtil.isNotEmpty(token)) {
+ String token_ = redisUtil.getValue(business + ":USER_Applets_" + userId1);//获取缓存中最新的数据
+ if (StringUtil.isNotEmpty(token_) && !token.equals(token_)) {//不在同一设备上登录,向其他设备发送数据
+ ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16));
+ JSONObject msg_ = new JSONObject();
+ msg_.put("code", 200);
+ msg_.put("msg", "SUCCESS");
+ msg_.put("method", "OFFLINE");
+ msg_.put("data", new Object());
+ this.sendMsgToClient(data_, msg_.toJSONString());
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ NettyChannelMap.remove_(data_);
+ }
+ }, 5000);
+ }
+ NettyChannelMap.update_(token.substring(token.length() - 16), ctx);//存储单点登录的通道
+ NettyChannelMap.update(business + ":Applets" + userId1, ctx);
+ redisUtil.setStrValue(business + ":USER_Applets_" + userId1, token);
+ }
+
+ //存储业务使用的通道
+ if (null != ctx && ctx.channel().isActive()) {
+ NettyChannelMap.update(business + ":Applets" + userId1, ctx);
+ }
+ }
+
+
+ }
+ } catch (Exception e) {
+ if (isdebug) {
+ NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
+ }
+ e.printStackTrace();
}
- } catch (Exception e) {
- if(isdebug) {
- NettyWebSocketController.sendMsgToClient(ctx, "__error__" + msg.toString());
- }
- e.printStackTrace();
}
-}
/**
* 向客户端发送消息
@@ -132,67 +134,69 @@
ChannelFuture sync;
try {
sync = ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)).sync();
- if(!sync.isSuccess()){
+ if (!sync.isSuccess()) {
boolean b = true;
for (int i = 0; i < 10; i++) {
ctx.wait(3000);
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
- if(sync.isSuccess()){
+ if (sync.isSuccess()) {
b = false;
break;
}
- System.err.println("小程序-》推送不成功,将继续推送"+msg);
+ System.err.println("小程序-》推送不成功,将继续推送" + msg);
}
- if(b){
+ if (b) {
NettyChannelMap.remove(ctx);
}
}
} catch (Exception e) {
- System.err.println("小程序-》推送发生异常,记录:"+msg);
+ System.err.println("小程序-》推送发生异常,记录:" + msg);
NettyChannelMap.remove(ctx);
}
- if(isdebug) {
- System.err.println("小程序-》 <<<--send-->>>" + msg) ;
+ if (isdebug) {
+ System.err.println("小程序-》 <<<--send-->>>" + msg);
}
- }else{
+ } else {
System.err.println("小程序-》推送失败,长连接不存在");
NettyChannelMap.remove(ctx);
}
}
// **链接断开 将推送消息记录
- public static void sendMsgToClient(String cacheType, Integer id,String msg) {
+ public static void sendMsgToClient(String cacheType, Integer id, String msg) {
ChannelHandlerContext ctx = NettyChannelMap.getData(cacheType + id);
if (ctx != null) {
ChannelFuture sync;
try {
sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
- if(!sync.isSuccess()){
+ if (!sync.isSuccess()) {
for (int i = 0; i < 10; i++) {
- sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
- if(!sync.isSuccess()){
- sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();;
- System.err.println("推送不成功,将继续推送"+msg);
- if(i == 9){
- table.put(cacheType+id, msg);
+ sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+ ;
+ if (!sync.isSuccess()) {
+ sync = ctx.channel().write(new TextWebSocketFrame(msg)).sync();
+ ;
+ System.err.println("推送不成功,将继续推送" + msg);
+ if (i == 9) {
+ table.put(cacheType + id, msg);
ctx.close();
- System.err.println("推送发生异常,记录:"+msg);
+ System.err.println("推送发生异常,记录:" + msg);
}
- }else{
+ } else {
break;
}
}
}
} catch (Exception e) {
- table.put(cacheType+id, msg);
- System.err.println("推送发生异常,记录:"+msg);
+ table.put(cacheType + id, msg);
+ System.err.println("推送发生异常,记录:" + msg);
}
- if(isdebug) {
+ if (isdebug) {
System.err.println("<<<--send-->>>" + msg);
}
- }else{
- table.put(cacheType+id, msg);
- System.err.println("链接断开,记录:id="+cacheType+id+",消息:"+msg);
+ } else {
+ table.put(cacheType + id, msg);
+ System.err.println("链接断开,记录:id=" + cacheType + id + ",消息:" + msg);
}
}
}
--
Gitblit v1.7.1