From c1d4849a2cc28cc50c5405f2382a5e4bf008b891 Mon Sep 17 00:00:00 2001 From: liujie <1793218484@qq.com> Date: 星期二, 12 八月 2025 10:16:45 +0800 Subject: [PATCH] update --- MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 218 +++++++++++++++++++++++------------------------------- 1 files changed, 94 insertions(+), 124 deletions(-) diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java index 4e003bd..97bc8f0 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java @@ -10,8 +10,6 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -22,7 +20,6 @@ import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.TimeUnit; /** @@ -37,11 +34,11 @@ public static Hashtable<String,String> table; - private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); + private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - - static{ @@ -61,7 +58,7 @@ * @param msg * @author TaoNingBo */ - public void JudgeOperation(ChannelHandlerContext ctx, Object msg) { + public synchronized void JudgeOperation(ChannelHandlerContext ctx, Object msg) { try { // ByteBuf转String ByteBuf byteBuf = (ByteBuf) msg; @@ -98,171 +95,141 @@ String userId1 = jsonCon.getString("userId"); String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); + String businessType = jsonCon.getString("businessType");//1:打车,2=代驾 + String business = "2".equals(businessType) ? "daijia" : "dache"; if(StringUtil.isNotEmpty(userId1)){ - String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); - if(!StringUtils.hasLength(fluid_control)){ - redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); - }else{ - long l = System.currentTimeMillis() - Long.valueOf(fluid_control); - if(l >= 10000){ - redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); - }else{ - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - return; - } - } - //判断用户或者司机长连接 + //判断用户或者司机长连接 if(type==1){ //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ - String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 + String token_ = redisUtil.getValue(business + ":USER_APP_"+ userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1); + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { + boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 + if(b){ + NettyChannelMap.remove_(data_); + } + new Timer().schedule(new TimerTask() { @Override public void run() { - NettyChannelMap.remove(context); + NettyChannelMap.remove_(data_); } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); + }, 5000); } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("USER_" + userId1, token); - } + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + NettyChannelMap.update(business + ":USER" + userId1, ctx); + redisUtil.setStrValue(business + ":USER_APP_" + userId1, token); } + //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("USER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + NettyChannelMap.update(business + ":USER" + userId1, ctx); } }else{ - Map<String, Object> params = new HashMap<>(); - params.put("driverId", userId1); - HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline"); - post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); - post.form(params); - HttpResponse execute = post.execute(); - if(200 != execute.getStatus()){ - System.err.println("调用driver-server添加司机在线数据出错了"); - } - JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); - if(jsonObject.getIntValue("code") != 200){ - System.err.println("调用driver-server添加司机在线数据出错了"); - } - - //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) - if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ - redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); - - String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 - if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){ - //如果是车载端登录,则将其它端都强迫下线 - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(ctx); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); - } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("DRIVER_" + userId1, token); - } - } - - //确保账号在单个设备上登录 - String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); - if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 - String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 + if(StringUtil.isNotEmpty(token)){//APP端登录的操作 + String token_ = redisUtil.getValue(business + ":DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 - ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1); - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(context); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); + ChannelHandlerContext data_ = NettyChannelMap.getData_(token_.substring(token_.length() - 16)); + if(null != data_){ + JSONObject msg_ = new JSONObject(); + msg_.put("code", 200); + msg_.put("msg", "SUCCESS"); + msg_.put("method", "OFFLINE"); + msg_.put("data", new Object()); + boolean b = this.sendMsgToClient(data_, msg_.toJSONString());//给当前通道发送消息 + if(b){ + NettyChannelMap.remove_(data_); + } + } } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("DRIVER_" + userId1, token); - } + NettyChannelMap.update(business + ":DRIVER" + userId1, ctx); + NettyChannelMap.update_(token.substring(token.length() - 16), ctx); + redisUtil.setStrValue(business + ":DRIVER_" + userId1, token); } - - - //存储通讯通道 + //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("DRIVER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + NettyChannelMap.update(business + ":DRIVER" + userId1, ctx); } } } + + if(null != ctx && ctx.channel().isActive()){ + jsonMsg.put("method", Method.pong); + sendMsgToClient(ctx, jsonMsg.toJSONString()); + } } //司机上传位置 if(method.equals(Method.location)){ Integer driverId = jsonCon.getInteger("driverId"); - String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId); + String businessType = jsonCon.getString("businessType");//1:打车,2=代驾 + String business = "2".equals(businessType) || null==businessType ? "daijia" : "dache"; + + String fluid_control = redisUtil.getValue(business + ":location_" + driverId); if(!StringUtils.hasLength(fluid_control)){ - redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + ""); }else{ long l = System.currentTimeMillis() - Long.valueOf(fluid_control); if(l < 5000){ return; } - redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + redisUtil.setStrValue(business + ":location_" + driverId, System.currentTimeMillis() + ""); } - + Integer orderId = jsonCon.getInteger("orderId"); Integer orderType = jsonCon.getInteger("orderType"); Double lon = jsonCon.getDouble("lon"); Double lat = jsonCon.getDouble("lat"); Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); Double altitude = jsonCon.getDouble("altitude"); - System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId); + System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString()); if(SinataUtil.isNotEmpty(driverId)){ if(null != lon && 0 != lon && null != lat && 0 != lat){ - if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 + if("dache".equals(business)){ + if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 + Map<String, Object> params = new HashMap<>(); + params.put("orderType", String.valueOf(orderType)); + params.put("orderId", String.valueOf(orderId)); + params.put("driverId", String.valueOf(driverId)); + params.put("lon", String.valueOf(lon)); + params.put("lat", String.valueOf(lat)); + params.put("directionAngle", String.valueOf(computeAzimuth)); + params.put("altitude", String.valueOf(altitude)); + HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition"); + post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE); + post.form(params); + HttpResponse execute = post.execute(); + if(200 != execute.getStatus()){ + System.err.println("调用driver-server存储位置数据出错了"); + } + JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); + if(jsonObject.getIntValue("code") != 200){ + System.err.println("调用driver-server存储位置数据出错了"); + } + } + } + + if("daijia".equals(business)){ + HttpHeaders headers = new HttpHeaders(); + // 以表单的方式提交 + headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); + //将请求头部和参数合成一个请求 Map<String, Object> params = new HashMap<>(); - params.put("orderType", String.valueOf(orderType)); - params.put("orderId", String.valueOf(orderId)); + params.put("orderType", null == orderType ? orderType : String.valueOf(orderType)); + params.put("orderId", null == orderId ? orderId : String.valueOf(orderId)); params.put("driverId", String.valueOf(driverId)); params.put("lon", String.valueOf(lon)); params.put("lat", String.valueOf(lat)); params.put("directionAngle", String.valueOf(computeAzimuth)); params.put("altitude", String.valueOf(altitude)); - HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition"); - post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); + HttpRequest post = HttpUtil.createPost(URLUtil.aj_zuul + "/driver-server/base/driver/addDriverPosition"); + post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE); post.form(params); HttpResponse execute = post.execute(); if(200 != execute.getStatus()){ @@ -273,13 +240,13 @@ System.err.println("调用driver-server存储位置数据出错了"); } } - System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat); - redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中 + + redisUtil.setStrValue(business + ":DRIVER" + driverId, lon + "," + lat, 30);//实时位置存入redis中 }else{ - NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } }else{ - NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); + this.sendMsgToClient(ctx, "__error__" + msg.toString()); } } @@ -298,7 +265,7 @@ * @param msg * @author TaoNingBo */ - public static void sendMsgToClient(ChannelHandlerContext ctx, String msg) { + public static boolean sendMsgToClient(ChannelHandlerContext ctx, String msg) { if (ctx != null && ctx.channel().isActive()) { ByteBuf buffer = Unpooled.copiedBuffer((msg).getBytes()); ChannelFuture sync; @@ -318,7 +285,9 @@ if(b){ NettyChannelMap.remove(ctx); } + return true; } + return sync.isSuccess(); } catch (Exception e) { System.err.println("推送发生异常,记录:"+msg); NettyChannelMap.remove(ctx); @@ -330,6 +299,7 @@ System.err.println("推送失败,长连接不存在"); NettyChannelMap.remove(ctx); } + return false; } // **链接断开 将推送消息记录 -- Gitblit v1.7.1