Pu Zhibing
昨天 cd972838ea38c1bd5e742dc5298f32a6b8d6ca71
提交推送服务
1个文件已添加
6个文件已修改
120 ■■■■■ 已修改文件
MessagePushTravel/pom.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/src/main/resources/application.yml 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
MessagePushTravel/pom.xml
@@ -65,6 +65,10 @@
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>
MessagePushTravel/src/main/java/com/sinata/push/MessagePushApplication.java
@@ -4,7 +4,10 @@
import com.sinata.push.util.echo.NettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling//开启定时任务
@SpringBootApplication
public class MessagePushApplication {
MessagePushTravel/src/main/java/com/sinata/push/controller/NettyController.java
@@ -33,6 +33,7 @@
    @ResponseBody
    @PostMapping("/sendMsgToClient")
    public String sendMsgToClient(Integer id, Integer type, String msg){
        System.out.println("推送参数:" + id + "---" + type + "---" + msg);
        if(type == 1){//用户端
            ChannelHandlerContext channel = NettyChannelMap.getData("Applets" + id);//小程序
            if(null != channel){
@@ -49,6 +50,7 @@
        }
        if(type == 2){//司机端
            System.out.println("长链接实例:" + JSON.toJSONString(NettyChannelMap.map));
            ChannelHandlerContext channel = NettyChannelMap.getData("DRIVER" + id);
            if(null != channel){
                nettyServerController.sendMsgToClient(channel, msg);
MessagePushTravel/src/main/java/com/sinata/push/util/TaskUtil.java
New file
@@ -0,0 +1,39 @@
package com.sinata.push.util;
import com.sinata.push.util.echo.Method;
import com.sinata.push.util.echo.NettyChannelMap;
import com.sinata.push.util.echo.NettyMsg;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
 * @author zhibing.pu
 * @Date 2025/6/25 20:33
 */
@Slf4j
@Component
public class TaskUtil {
    @Scheduled(fixedRate = 1000)
    public void taskMinute(){
        NettyChannelMap.map.keySet().forEach(key -> {
            ChannelHandlerContext context = NettyChannelMap.map.get(key);
            Channel channel = context.channel();
            if(context != null && channel.isActive()){
                String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                context.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                log.info("send channel:{}", key);
            }else{
                NettyChannelMap.map.remove(key);
                log.info("remove channel:{}", key);
            }
        });
    }
}
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyChannelMap.java
@@ -8,8 +8,8 @@
import java.util.concurrent.ConcurrentHashMap;
public class NettyChannelMap {
    protected static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
    public static Map<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();
    public static Map<String, ChannelHandlerContext> ctxMap = new HashMap<>();//单点登录存储的通道
@@ -26,7 +26,7 @@
     */
    public static ChannelHandlerContext getData(String key) {
        if(map==null){
            map = new HashMap<String, ChannelHandlerContext>();
            map = new ConcurrentHashMap<String, ChannelHandlerContext>();
        }
        return map.get(key);
    }
MessagePushTravel/src/main/java/com/sinata/push/util/echo/NettyServerController.java
@@ -115,6 +115,13 @@
                    //判断用户或者司机长连接
                    if(type==1){
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("USER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                        //确保账号在单个设备上登录
                        if(StringUtil.isNotEmpty(token)){
                            String token_ = (String)redisTemplate.opsForValue().get("USER_" + userId1);//获取缓存中最新的数据
@@ -140,43 +147,14 @@
                                redisTemplate.opsForValue().set("USER_" + userId1, token);
                            }
                        }
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储用户通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("USER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                    }else{
                        //TODO 存储最后一次上传的时间(用于保证车载端断电后1小时自动下班)
                        if(StringUtil.isNotEmpty(device) && device.equals("carDevice")){
                            redisTemplate.opsForValue().set("DEVICE_" + userId1, String.valueOf(System.currentTimeMillis()));
                            String token_ = (String)redisTemplate.opsForValue().get("DRIVER_" + userId1);//缓存中拿最新数据
                            if(StringUtil.isNotEmpty(token_) && !token_.equals(token)){
                                //如果是车载端登录,则将其它端都强迫下线
                                JSONObject msg_ = new JSONObject();
                                msg_.put("code", 200);
                                msg_.put("msg", "SUCCESS");
                                msg_.put("method", "OFFLINE");
                                msg_.put("data", new Object());
                                this.sendMsgToClient(ctx, msg_.toJSONString());//给当前通道发送消息
                                TimerTask timerTask = new TimerTask() {
                                    @Override
                                    public void run() {
                                        NettyChannelMap.remove(ctx);
                                    }
                                };
                                Timer timer = new Timer();
                                timer.schedule(timerTask, 3000);
                                timer.cancel();
                            }
                            if(StringUtil.isEmpty(token_)){//确保登录的时候存储token失败的情况
                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                            }
                        }
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                        //确保账号在单个设备上登录
                        String value = (String)redisTemplate.opsForValue().get("DEVICE_" + userId1);
                        if(StringUtil.isNotEmpty(token) && StringUtil.isEmpty(value)){//APP端登录的操作
@@ -203,15 +181,6 @@
                                redisTemplate.opsForValue().set("DRIVER_" + userId1, token);
                            }
                        }
                        //存储通讯通道
                        if(null != ctx && ctx.channel().isActive()){
                            System.err.println("开始存储司机通道:" + sdf.format(new Date()) + "----" + userId1);
                            NettyChannelMap.update("DRIVER" + userId1, ctx);
                            String s = NettyMsg.setMsg(Method.ok, new HashMap<String, Object>());
                            ctx.writeAndFlush(Unpooled.copiedBuffer((s).getBytes()));
                        }
                    }
                }
            }
@@ -235,7 +204,7 @@
                Double lat = jsonCon.getDouble("lat");
                Double computeAzimuth = jsonCon.getDouble("computeAzimuth");
                Double altitude = jsonCon.getDouble("altitude");
                System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + driverId);
                System.out.println("司机上传位置:" + sdf.format(new Date()) + "----" + jsonCon.toJSONString());
                if(SinataUtil.isNotEmpty(driverId)){
                    if(null !=  lon && 0 != lon && null !=  lat && 0 != lat){
                        if(null != orderId && 0 != driverId && null != orderType && 0 != orderType){//开始存入数据库
@@ -248,7 +217,7 @@
                            params.put("directionAngle", String.valueOf(computeAzimuth));
                            params.put("altitude", String.valueOf(altitude));
                            HttpRequest post = HttpUtil.createPost(URLUtil.zuul + "/driver-server/base/savePosition");
                            post.contentType(MediaType.APPLICATION_FORM_URLENCODED.getType());
                            post.contentType(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
                            post.form(params);
                            HttpResponse execute = post.execute();
                            if(200 != execute.getStatus()){
MessagePushTravel/src/main/resources/application.yml
@@ -2,7 +2,8 @@
  port: 6000
spring:
  profiles:
    active: prod
    active: dev
#    active: prod
  application:
    name: message #服务名称
  servlet: