From cd972838ea38c1bd5e742dc5298f32a6b8d6ca71 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期五, 27 六月 2025 15:47:14 +0800 Subject: [PATCH] 提交推送服务 --- MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java | 2 + MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java | 6 +- MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | 63 ++++++++----------------------- MessagePushTravel/pom.xml | 4 ++ MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java | 3 + MessagePushTravel/src/main/resources/application.yml | 3 + MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java | 39 +++++++++++++++++++ 7 files changed, 69 insertions(+), 51 deletions(-) diff --git a/MessagePushTravel/pom.xml b/MessagePushTravel/pom.xml index 22e9a9b..3829bb4 100644 --- a/MessagePushTravel/pom.xml +++ b/MessagePushTravel/pom.xml @@ -65,6 +65,10 @@ <artifactId>hutool-all</artifactId> <version>5.8.25</version> </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> </dependencies> <dependencyManagement> diff --git a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java index 707c4a3..a4b1862 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java @@ -4,7 +4,10 @@ import com.sinata.push.util.echo.NettyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +@EnableScheduling//开启定时任务 @SpringBootApplication public class MessagePushApplication { diff --git a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java index c0decdf..7056cec 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java @@ -33,6 +33,7 @@ @ResponseBody @PostMapping("/sendMsgToClient") public String sendMsgToClient(Integer id, Integer type, String msg){ + System.out.println("推送参数:" + id + "---" + type + "---" + msg); if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ @@ -49,6 +50,7 @@ } if(type == 2){//司机端 + System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map)); ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ nettyServerController.sendMsgToClient(channel, msg); diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java new file mode 100644 index 0000000..d6ab008 --- /dev/null +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java @@ -0,0 +1,39 @@ +package com.sinata.push.util; + +import com.sinata.push.util.echo.Method; +import com.sinata.push.util.echo.NettyChannelMap; +import com.sinata.push.util.echo.NettyMsg; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.HashMap; + +/** + * @author zhibing.pu + * @Date 2025/6/25 20:33 + */ +@Slf4j +@Component +public class TaskUtil { + + + @Scheduled(fixedRate = 1000) + public void taskMinute(){ + NettyChannelMap.map.keySet().forEach(key -> { + ChannelHandlerContext context = NettyChannelMap.map.get(key); + Channel channel = context.channel(); + if(context != null && channel.isActive()){ + String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + context.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + log.info("send channel:{}", key); + }else{ + NettyChannelMap.map.remove(key); + log.info("remove channel:{}", key); + } + }); + } +} diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java index acd89f0..228c11d 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java @@ -8,8 +8,8 @@ import java.util.concurrent.ConcurrentHashMap; public class NettyChannelMap { - - protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); + + public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道 @@ -26,7 +26,7 @@ */ public static ChannelHandlerContext getData(String key) { if(map==null){ - map = new HashMap<String, ChannelHandlerContext>(); + map = new ConcurrentHashMap<String, ChannelHandlerContext>(); } return map.get(key); } diff --git a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java index 1c6782b..014705c 100644 --- a/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java +++ b/MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java @@ -115,6 +115,13 @@ //判断用户或者司机长连接 if(type==1){ + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); + NettyChannelMap.update("USER" + userId1, ctx); + String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + } //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 @@ -140,43 +147,14 @@ redisTemplate.opsForValue().set("USER_" + userId1, token); } } - //存储通讯通道 - if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("USER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } }else{ - //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) - if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ - redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); - - String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 - if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){ - //如果是车载端登录,则将其它端都强迫下线 - JSONObject msg_ = new JSONObject(); - msg_.put("code", 200); - msg_.put("msg", "SUCCESS"); - msg_.put("method", "OFFLINE"); - msg_.put("data", new Object()); - this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 - TimerTask timerTask = new TimerTask() { - @Override - public void run() { - NettyChannelMap.remove(ctx); - } - }; - Timer timer = new Timer(); - timer.schedule(timerTask, 3000); - timer.cancel(); - } - if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 - redisTemplate.opsForValue().set("DRIVER_" + userId1, token); - } - } - - + //存储通讯通道 + if(null != ctx && ctx.channel().isActive()){ + System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); + NettyChannelMap.update("DRIVER" + userId1, ctx); + String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); + ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); + } //确保账号在单个设备上登录 String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 @@ -203,15 +181,6 @@ redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } } - - - //存储通讯通道 - if(null != ctx && ctx.channel().isActive()){ - System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); - NettyChannelMap.update("DRIVER" + userId1, ctx); - String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); - ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); - } } } } @@ -235,7 +204,7 @@ Double lat = jsonCon.getDouble("lat"); Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); Double altitude = jsonCon.getDouble("altitude"); - System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId); + System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString()); if(SinataUtil.isNotEmpty(driverId)){ if(null != lon && 0 != lon && null != lat && 0 != lat){ if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 @@ -248,7 +217,7 @@ params.put("directionAngle", String.valueOf(computeAzimuth)); params.put("altitude", String.valueOf(altitude)); HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition"); - post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); + post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE); post.form(params); HttpResponse execute = post.execute(); if(200 != execute.getStatus()){ diff --git a/MessagePushTravel/src/main/resources/application.yml b/MessagePushTravel/src/main/resources/application.yml index 05ef1b6..e3f3db1 100644 --- a/MessagePushTravel/src/main/resources/application.yml +++ b/MessagePushTravel/src/main/resources/application.yml @@ -2,7 +2,8 @@ port: 6000 spring: profiles: - active: prod + active: dev +# active: prod application: name: message #服务名称 servlet: -- Gitblit v1.7.1