From cd972838ea38c1bd5e742dc5298f32a6b8d6ca71 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期五, 27 六月 2025 15:47:14 +0800
Subject: [PATCH] 提交推送服务

---
 MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java      |    2 +
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java       |    6 +-
 MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java |   63 ++++++++-----------------------
 MessagePushTravel/pom.xml                                                            |    4 ++
 MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java          |    3 +
 MessagePushTravel/src/main/resources/application.yml                                 |    3 +
 MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java                   |   39 +++++++++++++++++++
 7 files changed, 69 insertions(+), 51 deletions(-)

diff --git a/MessagePushTravel/pom.xml b/MessagePushTravel/pom.xml
index 22e9a9b..3829bb4 100644
--- a/MessagePushTravel/pom.xml
+++ b/MessagePushTravel/pom.xml
@@ -65,6 +65,10 @@
             <artifactId>hutool-all</artifactId>
             <version>5.8.25</version>
         </dependency>
+	    <dependency>
+		    <groupId>org.projectlombok</groupId>
+		    <artifactId>lombok</artifactId>
+	    </dependency>
     </dependencies>
 
     <dependencyManagement>
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
index 707c4a3..a4b1862 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
@@ -4,7 +4,10 @@
 import com.sinata.push.util.echo.NettyServer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
+
+@EnableScheduling//开启定时任务
 @SpringBootApplication
 public class MessagePushApplication {
 
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
index c0decdf..7056cec 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -33,6 +33,7 @@
     @ResponseBody
     @PostMapping("/sendMsgToClient")
     public String sendMsgToClient(Integer id, Integer type, String msg){
+        System.out.println("推送参数:" + id + "---" + type + "---" + msg);
         if(type == 1){//用户端
             ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
             if(null != channel){
@@ -49,6 +50,7 @@
         }
 
         if(type == 2){//司机端
+            System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map));
             ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
             if(null != channel){
                 nettyServerController.sendMsgToClient(channel, msg);
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
new file mode 100644
index 0000000..d6ab008
--- /dev/null
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
@@ -0,0 +1,39 @@
+package com.sinata.push.util;
+
+import com.sinata.push.util.echo.Method;
+import com.sinata.push.util.echo.NettyChannelMap;
+import com.sinata.push.util.echo.NettyMsg;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+
+/**
+ * @author zhibing.pu
+ * @Date 2025/6/25 20:33
+ */
+@Slf4j
+@Component
+public class TaskUtil {
+	
+	
+	@Scheduled(fixedRate = 1000)
+	public void taskMinute(){
+		NettyChannelMap.map.keySet().forEach(key -> {
+			ChannelHandlerContext context = NettyChannelMap.map.get(key);
+			Channel channel = context.channel();
+			if(context != null && channel.isActive()){
+				String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+				context.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+				log.info("send channel:{}", key);
+			}else{
+				NettyChannelMap.map.remove(key);
+				log.info("remove channel:{}", key);
+			}
+		});
+	}
+}
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
index acd89f0..228c11d 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -8,8 +8,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 public class NettyChannelMap {
-
-	protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
+	
+	public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
 
 	public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道
 
@@ -26,7 +26,7 @@
 	 */
 	public static ChannelHandlerContext getData(String key) {
 		if(map==null){
-			map = new HashMap<String, ChannelHandlerContext>();
+			map = new ConcurrentHashMap<String, ChannelHandlerContext>();
 		}
 		return map.get(key);
 	}
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index 1c6782b..014705c 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -115,6 +115,13 @@
 
 					//判断用户或者司机长连接
                     if(type==1){
+	                    //存储通讯通道
+	                    if(null != ctx && ctx.channel().isActive()){
+		                    System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
+		                    NettyChannelMap.update("USER" + userId1, ctx);
+		                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+		                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+	                    }
                         //确保账号在单个设备上登录
                         if(StringUtil.isNotEmpty(token)){
                             String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
@@ -140,43 +147,14 @@
                                 redisTemplate.opsForValue().set("USER_" + userId1, token);
                             }
                         }
-						//存储通讯通道
-						if(null != ctx && ctx.channel().isActive()){
-							System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
-							NettyChannelMap.update("USER" + userId1, ctx);
-							String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-							ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-						}
                     }else{
-						//TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
-						if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
-							redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
-
-							String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
-							if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){
-								//如果是车载端登录,则将其它端都强迫下线
-								JSONObject msg_ = new JSONObject();
-								msg_.put("code", 200);
-								msg_.put("msg", "SUCCESS");
-								msg_.put("method", "OFFLINE");
-								msg_.put("data", new Object());
-								this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息
-								TimerTask timerTask = new TimerTask() {
-									@Override
-									public void run() {
-										NettyChannelMap.remove(ctx);
-									}
-								};
-								Timer timer = new Timer();
-								timer.schedule(timerTask, 3000);
-								timer.cancel();
-							}
-							if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
-								redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
-							}
-						}
-
-
+	                    //存储通讯通道
+	                    if(null != ctx && ctx.channel().isActive()){
+		                    System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
+		                    NettyChannelMap.update("DRIVER" + userId1, ctx);
+		                    String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+		                    ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+	                    }
                         //确保账号在单个设备上登录
 						String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
 						if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
@@ -203,15 +181,6 @@
                                 redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                             }
                         }
-
-
-                        //存储通讯通道
-                        if(null != ctx && ctx.channel().isActive()){
-                            System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
-                            NettyChannelMap.update("DRIVER" + userId1, ctx);
-                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
-                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
-                        }
                     }
                 }
 			}
@@ -235,7 +204,7 @@
 				Double lat = jsonCon.getDouble("lat");
 				Double computeAzimuth = jsonCon.getDouble("computeAzimuth");
 				Double altitude = jsonCon.getDouble("altitude");
-				System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId);
+				System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString());
 				if(SinataUtil.isNotEmpty(driverId)){
 					if(null !=  lon && 0 != lon && null !=  lat && 0 != lat){
 						if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
@@ -248,7 +217,7 @@
 							params.put("directionAngle", String.valueOf(computeAzimuth));
 							params.put("altitude", String.valueOf(altitude));
 							HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
-							post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
+							post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
 							post.form(params);
 							HttpResponse execute = post.execute();
 							if(200 != execute.getStatus()){
diff --git a/MessagePushTravel/src/main/resources/application.yml b/MessagePushTravel/src/main/resources/application.yml
index 05ef1b6..e3f3db1 100644
--- a/MessagePushTravel/src/main/resources/application.yml
+++ b/MessagePushTravel/src/main/resources/application.yml
@@ -2,7 +2,8 @@
   port: 6000
 spring:
   profiles:
-    active: prod
+    active: dev
+#    active: prod
   application:
     name: message #服务名称
   servlet:

--
Gitblit v1.7.1