From 22c308a8c1317473c7279f7bf866814c64ef36e9 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期日, 22 六月 2025 01:09:54 +0800 Subject: [PATCH] 提交推送服务 --- MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 170 ++++++++++++++++++++++++++++++-------------------------- 1 files changed, 92 insertions(+), 78 deletions(-) diff --git a/ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java similarity index 66% rename from ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java rename to MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java index 1765dae..4e003bd 100644 --- a/ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java @@ -1,20 +1,28 @@ -package com.sinata.zuul.util.echo; +package com.sinata.push.util.echo; +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.sinata.zuul.util.*; +import com.sinata.push.util.*; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; +import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.TimeUnit; /** @@ -29,11 +37,9 @@ public static Hashtable<String,String> table; - private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class); + private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class); - private GDMapGeocodingUtil gdMapGeocodingUtil = SpringUtil.getObject(GDMapGeocodingUtil.class); - - private RestTemplate internalRestTemplate = SpringUtil.getObject(RestTemplate.class); + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @@ -85,11 +91,6 @@ } JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString()); - if(null != ctx && ctx.channel().isActive()){ - jsonMsg.put("method", Method.pong); - sendMsgToClient(ctx, jsonMsg.toJSONString()); - } - //心跳 if(method.equals(Method.ping)) { Integer type = jsonCon.getInteger("type"); @@ -98,62 +99,74 @@ String device = jsonCon.getString("device"); String version = jsonCon.getString("version"); if(StringUtil.isNotEmpty(userId1)){ + String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type); + if(!StringUtils.hasLength(fluid_control)){ + redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); + }else{ + long l = System.currentTimeMillis() - Long.valueOf(fluid_control); + if(l >= 10000){ + redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + ""); + }else{ + String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + return; + } + } - //判断用户或者司机长连接 + //判断用户或者司机长连接 if(type==1){ //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ - String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据 + String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据 - JSONObject msg_ = new JSONObject(); + ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1); + JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); - this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 + this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { - NettyChannelMap.remove_(ctx); - NettyChannelMap.remove(ctx); + NettyChannelMap.remove(context); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); - }else{ - NettyChannelMap.update_(token.substring(0, 23), ctx); - NettyChannelMap.update("USER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } + } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisUtil.setStrValue("USER_APP_" + userId1, token); + redisTemplate.opsForValue().set("USER_" + userId1, token); } } - + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); + NettyChannelMap.update("USER" + userId1, ctx); + String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + } }else{ - //添加司机在线 - HttpHeaders headers = new HttpHeaders(); - // 以表单的方式提交 - headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); - //将请求头部和参数合成一个请求 - MultiValueMap<String, Object> params = new LinkedMultiValueMap<>(); - params.add("driverId", userId1); - params.add("device", (null != device && device.equals("carDevice")) ? "2" : "1"); - params.add("version", version); - HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers); - String w = internalRestTemplate.postForObject("http://driver-server/base/driverOnline/addDriverOnline",requestEntity , String.class); - JSONObject jsonObject = JSON.parseObject(w, JSONObject.class); + Map<String, Object> params = new HashMap<>(); + params.put("driverId", userId1); + HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline"); + post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); + post.form(params); + HttpResponse execute = post.execute(); + if(200 != execute.getStatus()){ + System.err.println("调用driver-server添加司机在线数据出错了"); + } + JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); if(jsonObject.getIntValue("code") != 200){ System.err.println("调用driver-server添加司机在线数据出错了"); } //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ - redisUtil.setStrValue("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); + redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); - String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 + String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){ //如果是车载端登录,则将其它端都强迫下线 JSONObject msg_ = new JSONObject(); @@ -165,63 +178,50 @@ TimerTask timerTask = new TimerTask() { @Override public void run() { - NettyChannelMap.remove_(ctx); NettyChannelMap.remove(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); - }else{ -// System.err.println("开始存储司机通道" + userId1); - NettyChannelMap.update("DRIVER" + userId1, ctx); - NettyChannelMap.update_(token.substring(0, 23), ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisUtil.setStrValue("DRIVER_" + userId1, token); + redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } } //确保账号在单个设备上登录 - String value = redisUtil.getValue("DEVICE_" + userId1); + String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 - String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据 + String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据 - JSONObject msg_ = new JSONObject(); + ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1); + JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); - this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 + this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { - NettyChannelMap.remove_(ctx); - NettyChannelMap.remove(ctx); + NettyChannelMap.remove(context); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); - }else{ -// System.err.println("开始存储司机通道" + userId1); - NettyChannelMap.update("DRIVER" + userId1, ctx); - NettyChannelMap.update_(token.substring(0, 23), ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } + } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisUtil.setStrValue("DRIVER_" + userId1, token); + redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ -// System.err.println("开始存储司机通道" + userId1); + System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); @@ -232,35 +232,49 @@ //司机上传位置 if(method.equals(Method.location)){ Integer driverId = jsonCon.getInteger("driverId"); + String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId); + if(!StringUtils.hasLength(fluid_control)){ + redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + }else{ + long l = System.currentTimeMillis() - Long.valueOf(fluid_control); + if(l < 5000){ + return; + } + redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + ""); + } + Integer orderId = jsonCon.getInteger("orderId"); Integer orderType = jsonCon.getInteger("orderType"); Double lon = jsonCon.getDouble("lon"); Double lat = jsonCon.getDouble("lat"); Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); Double altitude = jsonCon.getDouble("altitude"); + System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId); if(SinataUtil.isNotEmpty(driverId)){ if(null != lon && 0 != lon && null != lat && 0 != lat){ if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 - HttpHeaders headers = new HttpHeaders(); - // 以表单的方式提交 - headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); - //将请求头部和参数合成一个请求 - MultiValueMap<String, Object> params = new LinkedMultiValueMap<>(); - params.add("orderType", String.valueOf(orderType)); - params.add("orderId", String.valueOf(orderId)); - params.add("driverId", String.valueOf(driverId)); - params.add("lon", String.valueOf(lon)); - params.add("lat", String.valueOf(lat)); - params.add("directionAngle", String.valueOf(computeAzimuth)); - params.add("altitude", String.valueOf(altitude)); - HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers); - String s = internalRestTemplate.postForObject("http://driver-server/base/savePosition",requestEntity , String.class); - JSONObject jsonObject = JSON.parseObject(s, JSONObject.class); + Map<String, Object> params = new HashMap<>(); + params.put("orderType", String.valueOf(orderType)); + params.put("orderId", String.valueOf(orderId)); + params.put("driverId", String.valueOf(driverId)); + params.put("lon", String.valueOf(lon)); + params.put("lat", String.valueOf(lat)); + params.put("directionAngle", String.valueOf(computeAzimuth)); + params.put("altitude", String.valueOf(altitude)); + HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition"); + post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); + post.form(params); + HttpResponse execute = post.execute(); + if(200 != execute.getStatus()){ + System.err.println("调用driver-server存储位置数据出错了"); + } + JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class); if(jsonObject.getIntValue("code") != 200){ System.err.println("调用driver-server存储位置数据出错了"); } } - redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 300);//实时位置存入redis中 + System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat); + redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中 }else{ NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString()); } -- Gitblit v1.7.1