MessagePushTravel/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
MessagePushTravel/src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
MessagePushTravel/pom.xml
@@ -65,6 +65,10 @@ <artifactId>hutool-all</artifactId> <version>5.8.25</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> <dependencyManagement> MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
@@ -4,7 +4,10 @@ import com.sinata.push.util.echo.NettyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling//开启定时任务 @SpringBootApplication public class MessagePushApplication { MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -33,6 +33,7 @@ @ResponseBody @PostMapping("/sendMsgToClient") public String sendMsgToClient(Integer id, Integer type, String msg){ System.out.println("推送参数:" + id + "---" + type + "---" + msg); if(type == 1){//用户端 ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序 if(null != channel){ @@ -49,6 +50,7 @@ } if(type == 2){//司机端 System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map)); ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id); if(null != channel){ nettyServerController.sendMsgToClient(channel, msg); MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
New file @@ -0,0 +1,39 @@ package com.sinata.push.util; import com.sinata.push.util.echo.Method; import com.sinata.push.util.echo.NettyChannelMap; import com.sinata.push.util.echo.NettyMsg; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.HashMap; /** * @author zhibing.pu * @Date 2025/6/25 20:33 */ @Slf4j @Component public class TaskUtil { @Scheduled(fixedRate = 1000) public void taskMinute(){ NettyChannelMap.map.keySet().forEach(key -> { ChannelHandlerContext context = NettyChannelMap.map.get(key); Channel channel = context.channel(); if(context != null && channel.isActive()){ String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); context.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); log.info("send channel:{}", key); }else{ NettyChannelMap.map.remove(key); log.info("remove channel:{}", key); } }); } } MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -8,8 +8,8 @@ import java.util.concurrent.ConcurrentHashMap; public class NettyChannelMap { protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>(); public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道 @@ -26,7 +26,7 @@ */ public static ChannelHandlerContext getData(String key) { if(map==null){ map = new HashMap<String, ChannelHandlerContext>(); map = new ConcurrentHashMap<String, ChannelHandlerContext>(); } return map.get(key); } MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -115,6 +115,13 @@ //判断用户或者司机长连接 if(type==1){ //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("USER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } //确保账号在单个设备上登录 if(StringUtil.isNotEmpty(token)){ String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据 @@ -140,43 +147,14 @@ redisTemplate.opsForValue().set("USER_" + userId1, token); } } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("USER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } }else{ //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班) if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){ redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis())); String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据 if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){ //如果是车载端登录,则将其它端都强迫下线 JSONObject msg_ = new JSONObject(); msg_.put("code", 200); msg_.put("msg", "SUCCESS"); msg_.put("method", "OFFLINE"); msg_.put("data", new Object()); this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息 TimerTask timerTask = new TimerTask() { @Override public void run() { NettyChannelMap.remove(ctx); } }; Timer timer = new Timer(); timer.schedule(timerTask, 3000); timer.cancel(); } if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况 redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } //确保账号在单个设备上登录 String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1); if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作 @@ -203,15 +181,6 @@ redisTemplate.opsForValue().set("DRIVER_" + userId1, token); } } //存储通讯通道 if(null != ctx && ctx.channel().isActive()){ System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1); NettyChannelMap.update("DRIVER" + userId1, ctx); String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>()); ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes())); } } } } @@ -235,7 +204,7 @@ Double lat = jsonCon.getDouble("lat"); Double computeAzimuth = jsonCon.getDouble("computeAzimuth"); Double altitude = jsonCon.getDouble("altitude"); System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId); System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString()); if(SinataUtil.isNotEmpty(driverId)){ if(null != lon && 0 != lon && null != lat && 0 != lat){ if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库 @@ -248,7 +217,7 @@ params.put("directionAngle", String.valueOf(computeAzimuth)); params.put("altitude", String.valueOf(altitude)); HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition"); post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType()); post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE); post.form(params); HttpResponse execute = post.execute(); if(200 != execute.getStatus()){ MessagePushTravel/src/main/resources/application.yml
@@ -2,7 +2,8 @@ port: 6000 spring: profiles: active: prod active: dev # active: prod application: name: message #服务名称 servlet: