From 22c308a8c1317473c7279f7bf866814c64ef36e9 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期日, 22 六月 2025 01:09:54 +0800
Subject: [PATCH] 提交推送服务

---
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java |  170 ++++++++++++++++++++++++++++++--------------------------
 1 files changed, 92 insertions(+), 78 deletions(-)

diff --git a/ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
similarity index 66%
rename from ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java
rename to MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index 1765dae..4e003bd 100644
--- a/ZuulQYTTravel/src/main/java/com/sinata/zuul/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -1,20 +1,28 @@
-package com.sinata.zuul.util.echo;
+package com.sinata.push.util.echo;
 
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.HttpUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.sinata.zuul.util.*;
+import com.sinata.push.util.*;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
+import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
+import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -29,11 +37,9 @@
 
 	public static Hashtable<String,String> table;
 
-	private RedisUtil redisUtil = SpringUtil.getObject(RedisUtil.class);
+	private RedisTemplate<String, String> redisTemplate = SpringUtil.getObject(StringRedisTemplate.class);
 
-	private GDMapGeocodingUtil gdMapGeocodingUtil = SpringUtil.getObject(GDMapGeocodingUtil.class);
-
-	private RestTemplate internalRestTemplate = SpringUtil.getObject(RestTemplate.class);
+	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
 
 
@@ -85,11 +91,6 @@
 			}
 			JSONObject jsonCon = JSONObject.parseObject(jsonMsg.get("data").toString());
 
-			if(null != ctx && ctx.channel().isActive()){
-				jsonMsg.put("method", Method.pong);
-				sendMsgToClient(ctx, jsonMsg.toJSONString());
-			}
-
 			//心跳
 			if(method.equals(Method.ping)) {
 				Integer type = jsonCon.getInteger("type");
@@ -98,62 +99,74 @@
 				String device = jsonCon.getString("device");
 				String version = jsonCon.getString("version");
                 if(StringUtil.isNotEmpty(userId1)){
+					String fluid_control = redisTemplate.opsForValue().get("fluid_control_" + userId1 + "_" + type);
+					if(!StringUtils.hasLength(fluid_control)){
+						redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
+					}else{
+						long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
+						if(l >= 10000){
+							redisTemplate.opsForValue().set("fluid_control_" + userId1 + "_" + type, System.currentTimeMillis() + "");
+						}else{
+							String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+							ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+							return;
+						}
+					}
 
-                    //判断用户或者司机长连接
+					//判断用户或者司机长连接
                     if(type==1){
                         //确保账号在单个设备上登录
                         if(StringUtil.isNotEmpty(token)){
-                            String token_ = redisUtil.getValue("USER_APP_"+ userId1);//获取缓存中最新的数据
+                            String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
                             if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向其他设备发送数据
-                                JSONObject msg_ = new JSONObject();
+								ChannelHandlerContext context = NettyChannelMap.getData("USER" + userId1);
+								JSONObject msg_ = new JSONObject();
                                 msg_.put("code", 200);
                                 msg_.put("msg", "SUCCESS");
                                 msg_.put("method", "OFFLINE");
                                 msg_.put("data", new Object());
-								this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息
+								this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                 TimerTask timerTask = new TimerTask() {
                                     @Override
                                     public void run() {
-                                        NettyChannelMap.remove_(ctx);
-										NettyChannelMap.remove(ctx);
+										NettyChannelMap.remove(context);
                                     }
                                 };
                                 Timer timer = new Timer();
                                 timer.schedule(timerTask, 3000);
                                 timer.cancel();
-                            }else{
-								NettyChannelMap.update_(token.substring(0, 23), ctx);
-								NettyChannelMap.update("USER" + userId1, ctx);
-								String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-								ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-							}
+                            }
                             if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                                redisUtil.setStrValue("USER_APP_" + userId1, token);
+                                redisTemplate.opsForValue().set("USER_" + userId1, token);
                             }
                         }
-
+						//存储通讯通道
+						if(null != ctx && ctx.channel().isActive()){
+							System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
+							NettyChannelMap.update("USER" + userId1, ctx);
+							String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+							ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+						}
                     }else{
-						//添加司机在线
-						HttpHeaders headers = new HttpHeaders();
-						// 以表单的方式提交
-						headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
-						//将请求头部和参数合成一个请求
-						MultiValueMap<String, Object> params = new LinkedMultiValueMap<>();
-						params.add("driverId", userId1);
-						params.add("device", (null != device && device.equals("carDevice")) ? "2" : "1");
-						params.add("version", version);
-						HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers);
-						String w = internalRestTemplate.postForObject("http://driver-server/base/driverOnline/addDriverOnline",requestEntity , String.class);
-						JSONObject jsonObject = JSON.parseObject(w, JSONObject.class);
+	                    Map<String, Object> params = new HashMap<>();
+						params.put("driverId", userId1);
+	                    HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/driverOnline/addDriverOnline");
+	                    post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
+	                    post.form(params);
+	                    HttpResponse execute = post.execute();
+						if(200 != execute.getStatus()){
+							System.err.println("调用driver-server添加司机在线数据出错了");
+						}
+	                    JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class);
 						if(jsonObject.getIntValue("code") != 200){
 							System.err.println("调用driver-server添加司机在线数据出错了");
 						}
 
 						//TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
 						if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
-							redisUtil.setStrValue("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
+							redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
 
-							String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
+							String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
 							if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){
 								//如果是车载端登录,则将其它端都强迫下线
 								JSONObject msg_ = new JSONObject();
@@ -165,63 +178,50 @@
 								TimerTask timerTask = new TimerTask() {
 									@Override
 									public void run() {
-										NettyChannelMap.remove_(ctx);
 										NettyChannelMap.remove(ctx);
 									}
 								};
 								Timer timer = new Timer();
 								timer.schedule(timerTask, 3000);
 								timer.cancel();
-							}else{
-//								System.err.println("开始存储司机通道" + userId1);
-								NettyChannelMap.update("DRIVER" + userId1, ctx);
-								NettyChannelMap.update_(token.substring(0, 23), ctx);
-								String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-								ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
 							}
 							if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-								redisUtil.setStrValue("DRIVER_" + userId1, token);
+								redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
 							}
 						}
 
 
                         //确保账号在单个设备上登录
-						String value = redisUtil.getValue("DEVICE_" + userId1);
+						String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
 						if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
-                            String token_ = redisUtil.getValue("DRIVER_" + userId1);//缓存中拿最新数据
+                            String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
                             if(StringUtil.isNotEmpty(token_) && !token.equals(token_)){//不在同一设备上登录,向当前设备发送数据
-                                JSONObject msg_ = new JSONObject();
+								ChannelHandlerContext context = NettyChannelMap.getData("DRIVER" + userId1);
+								JSONObject msg_ = new JSONObject();
                                 msg_.put("code", 200);
                                 msg_.put("msg", "SUCCESS");
                                 msg_.put("method", "OFFLINE");
                                 msg_.put("data", new Object());
-								this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息
+								this.sendMsgToClient(context, msg_.toJSONString());//给当前通道发送消息
                                 TimerTask timerTask = new TimerTask() {
                                     @Override
                                     public void run() {
-                                        NettyChannelMap.remove_(ctx);
-										NettyChannelMap.remove(ctx);
+										NettyChannelMap.remove(context);
                                     }
                                 };
                                 Timer timer = new Timer();
                                 timer.schedule(timerTask, 3000);
                                 timer.cancel();
-                            }else{
-//								System.err.println("开始存储司机通道" + userId1);
-								NettyChannelMap.update("DRIVER" + userId1, ctx);
-								NettyChannelMap.update_(token.substring(0, 23), ctx);
-								String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-								ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-							}
+                            }
                             if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-                                redisUtil.setStrValue("DRIVER_" + userId1, token);
+                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                             }
                         }
 
 
                         //存储通讯通道
                         if(null != ctx && ctx.channel().isActive()){
-//                            System.err.println("开始存储司机通道" + userId1);
+                            System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
                             NettyChannelMap.update("DRIVER" + userId1, ctx);
                             String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                             ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
@@ -232,35 +232,49 @@
 			//司机上传位置
 			if(method.equals(Method.location)){
 				Integer driverId = jsonCon.getInteger("driverId");
+				String fluid_control = (String)redisTemplate.opsForValue().get("location_" + driverId);
+				if(!StringUtils.hasLength(fluid_control)){
+					redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+				}else{
+					long l = System.currentTimeMillis() - Long.valueOf(fluid_control);
+					if(l < 5000){
+						return;
+					}
+					redisTemplate.opsForValue().set("location_" + driverId, System.currentTimeMillis() + "");
+				}
+
 				Integer orderId = jsonCon.getInteger("orderId");
 				Integer orderType = jsonCon.getInteger("orderType");
 				Double lon = jsonCon.getDouble("lon");
 				Double lat = jsonCon.getDouble("lat");
 				Double computeAzimuth = jsonCon.getDouble("computeAzimuth");
 				Double altitude = jsonCon.getDouble("altitude");
+				System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId);
 				if(SinataUtil.isNotEmpty(driverId)){
 					if(null !=  lon && 0 != lon && null !=  lat && 0 != lat){
 						if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
-							HttpHeaders headers = new HttpHeaders();
-							// 以表单的方式提交
-							headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
-							//将请求头部和参数合成一个请求
-							MultiValueMap<String, Object> params = new LinkedMultiValueMap<>();
-							params.add("orderType", String.valueOf(orderType));
-							params.add("orderId", String.valueOf(orderId));
-							params.add("driverId", String.valueOf(driverId));
-							params.add("lon", String.valueOf(lon));
-							params.add("lat", String.valueOf(lat));
-							params.add("directionAngle", String.valueOf(computeAzimuth));
-							params.add("altitude", String.valueOf(altitude));
-							HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(params, headers);
-							String s = internalRestTemplate.postForObject("http://driver-server/base/savePosition",requestEntity , String.class);
-							JSONObject jsonObject = JSON.parseObject(s, JSONObject.class);
+							Map<String, Object> params = new HashMap<>();
+							params.put("orderType", String.valueOf(orderType));
+							params.put("orderId", String.valueOf(orderId));
+							params.put("driverId", String.valueOf(driverId));
+							params.put("lon", String.valueOf(lon));
+							params.put("lat", String.valueOf(lat));
+							params.put("directionAngle", String.valueOf(computeAzimuth));
+							params.put("altitude", String.valueOf(altitude));
+							HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
+							post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
+							post.form(params);
+							HttpResponse execute = post.execute();
+							if(200 != execute.getStatus()){
+								System.err.println("调用driver-server存储位置数据出错了");
+							}
+							JSONObject jsonObject = JSON.parseObject(execute.body(), JSONObject.class);
 							if(jsonObject.getIntValue("code") != 200){
 								System.err.println("调用driver-server存储位置数据出错了");
 							}
 						}
-                        redisUtil.setStrValue("DRIVER" + driverId, lon + "," + lat, 300);//实时位置存入redis中
+						System.out.println("id:" + driverId + "---lon" + lon + "---lat" + lat);
+                        redisTemplate.opsForValue().set("DRIVER" + driverId, lon + "," + lat, 300, TimeUnit.SECONDS);//实时位置存入redis中
 					}else{
 						NettyServerController.sendMsgToClient(ctx, "__error__" + msg.toString());
 					}

--
Gitblit v1.7.1