From cd972838ea38c1bd5e742dc5298f32a6b8d6ca71 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期五, 27 六月 2025 15:47:14 +0800
Subject: [PATCH] 提交推送服务
---
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java | 2 +
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java | 6 +-
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 63 ++++++++-----------------------
MessagePushTravel/pom.xml | 4 ++
MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java | 3 +
MessagePushTravel/src/main/resources/application.yml | 3 +
MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java | 39 +++++++++++++++++++
7 files changed, 69 insertions(+), 51 deletions(-)
diff --git a/MessagePushTravel/pom.xml b/MessagePushTravel/pom.xml
index 22e9a9b..3829bb4 100644
--- a/MessagePushTravel/pom.xml
+++ b/MessagePushTravel/pom.xml
@@ -65,6 +65,10 @@
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
</dependencies>
<dependencyManagement>
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
index 707c4a3..a4b1862 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
@@ -4,7 +4,10 @@
import com.sinata.push.util.echo.NettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@EnableScheduling//开启定时任务
@SpringBootApplication
public class MessagePushApplication {
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
index c0decdf..7056cec 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -33,6 +33,7 @@
@ResponseBody
@PostMapping("/sendMsgToClient")
public String sendMsgToClient(Integer id, Integer type, String msg){
+ System.out.println("推送参数:" + id + "---" + type + "---" + msg);
if(type == 1){//用户端
ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
if(null != channel){
@@ -49,6 +50,7 @@
}
if(type == 2){//司机端
+ System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map));
ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
if(null != channel){
nettyServerController.sendMsgToClient(channel, msg);
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
new file mode 100644
index 0000000..d6ab008
--- /dev/null
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
@@ -0,0 +1,39 @@
+package com.sinata.push.util;
+
+import com.sinata.push.util.echo.Method;
+import com.sinata.push.util.echo.NettyChannelMap;
+import com.sinata.push.util.echo.NettyMsg;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+
+/**
+ * @author zhibing.pu
+ * @Date 2025/6/25 20:33
+ */
+@Slf4j
+@Component
+public class TaskUtil {
+
+
+ @Scheduled(fixedRate = 1000)
+ public void taskMinute(){
+ NettyChannelMap.map.keySet().forEach(key -> {
+ ChannelHandlerContext context = NettyChannelMap.map.get(key);
+ Channel channel = context.channel();
+ if(context != null && channel.isActive()){
+ String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+ context.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+ log.info("send channel:{}", key);
+ }else{
+ NettyChannelMap.map.remove(key);
+ log.info("remove channel:{}", key);
+ }
+ });
+ }
+}
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
index acd89f0..228c11d 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -8,8 +8,8 @@
import java.util.concurrent.ConcurrentHashMap;
public class NettyChannelMap {
-
- protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
+
+ public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道
@@ -26,7 +26,7 @@
*/
public static ChannelHandlerContext getData(String key) {
if(map==null){
- map = new HashMap<String, ChannelHandlerContext>();
+ map = new ConcurrentHashMap<String, ChannelHandlerContext>();
}
return map.get(key);
}
diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
index 1c6782b..014705c 100644
--- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
+++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -115,6 +115,13 @@
//判断用户或者司机长连接
if(type==1){
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
+ NettyChannelMap.update("USER" + userId1, ctx);
+ String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+ ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+ }
//确保账号在单个设备上登录
if(StringUtil.isNotEmpty(token)){
String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
@@ -140,43 +147,14 @@
redisTemplate.opsForValue().set("USER_" + userId1, token);
}
}
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("USER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
}else{
- //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
- if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
- redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
-
- String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
- if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){
- //如果是车载端登录,则将其它端都强迫下线
- JSONObject msg_ = new JSONObject();
- msg_.put("code", 200);
- msg_.put("msg", "SUCCESS");
- msg_.put("method", "OFFLINE");
- msg_.put("data", new Object());
- this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- NettyChannelMap.remove(ctx);
- }
- };
- Timer timer = new Timer();
- timer.schedule(timerTask, 3000);
- timer.cancel();
- }
- if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
- redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
- }
- }
-
-
+ //存储通讯通道
+ if(null != ctx && ctx.channel().isActive()){
+ System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
+ NettyChannelMap.update("DRIVER" + userId1, ctx);
+ String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
+ ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
+ }
//确保账号在单个设备上登录
String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
@@ -203,15 +181,6 @@
redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
}
}
-
-
- //存储通讯通道
- if(null != ctx && ctx.channel().isActive()){
- System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
- NettyChannelMap.update("DRIVER" + userId1, ctx);
- String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
- ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
- }
}
}
}
@@ -235,7 +204,7 @@
Double lat = jsonCon.getDouble("lat");
Double computeAzimuth = jsonCon.getDouble("computeAzimuth");
Double altitude = jsonCon.getDouble("altitude");
- System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId);
+ System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString());
if(SinataUtil.isNotEmpty(driverId)){
if(null != lon && 0 != lon && null != lat && 0 != lat){
if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
@@ -248,7 +217,7 @@
params.put("directionAngle", String.valueOf(computeAzimuth));
params.put("altitude", String.valueOf(altitude));
HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
- post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
+ post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
post.form(params);
HttpResponse execute = post.execute();
if(200 != execute.getStatus()){
diff --git a/MessagePushTravel/src/main/resources/application.yml b/MessagePushTravel/src/main/resources/application.yml
index 05ef1b6..e3f3db1 100644
--- a/MessagePushTravel/src/main/resources/application.yml
+++ b/MessagePushTravel/src/main/resources/application.yml
@@ -2,7 +2,8 @@
port: 6000
spring:
profiles:
- active: prod
+ active: dev
+# active: prod
application:
name: message #服务名称
servlet:
--
Gitblit v1.7.1