Pu Zhibing
2025-03-14 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7
修改809对接bug
17个文件已修改
535 ■■■■■ 已修改文件
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/dao/UPWarnMsgAdptInfoDao.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNWarnMsgUrgeTodoReq.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/GnssData.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/UPWarnMsgAdptInfo.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/enu/PositionAlarm.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClientHandler.java 116 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/server/NettyHandle.java 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/BaseMsgService.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java 93 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/CtrlMsgService.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ExgMsgService.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/PlatformMsgService.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/UPDisconnectReqService.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/UPLinkTestReqService.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/dao/UPWarnMsgAdptInfoDao.java
@@ -20,5 +20,6 @@
     * @param result
     * @return
     */
    List<UPWarnMsgAdptInfo> findByResultIs(int result);
    List<UPWarnMsgAdptInfo> findByResultIsAndPushTimeBefore(int result, long pushTime);
}
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNWarnMsgUrgeTodoReq.java
@@ -79,17 +79,15 @@
    public DOWNWarnMsgUrgeTodoReq build(UPWarnMsgAdptInfo upWarnMsgAdptInfo) {
        this.vehicleNo = upWarnMsgAdptInfo.getVehicleNo();
        this.vehicleColor = upWarnMsgAdptInfo.getVehicleColor();
        this.dataType = upWarnMsgAdptInfo.getDataType();
        this.dataLength = upWarnMsgAdptInfo.getDataLength();
        this.warnSrc = upWarnMsgAdptInfo.getWarnSrc();
        this.warnType = upWarnMsgAdptInfo.getWarnType();
        this.warnTime = upWarnMsgAdptInfo.getWarnTime();
        this.supervisionId = upWarnMsgAdptInfo.getInfoId();
        this.supervisionEndTime = LocalDateTime.now().plusDays(7).toEpochSecond(ZoneOffset.UTC);
        this.supervisionLevel = 0x01;
        this.supervisor = "admin";
        this.supervisorTel = "";
        this.supervisorEmail = "";
        this.supervisor = "谢茜";
        this.supervisorTel = "18808258011";
        this.supervisorEmail = "1360001080@qq.com";
        return this;
    }
    
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/GnssData.java
@@ -24,12 +24,12 @@
    /**
     * 日月年
     */
    @Field(type = FieldType.Integer)
    @Field(type = FieldType.Text)
    private String date;
    /**
     * 时分秒
     */
    @Field(type = FieldType.Integer)
    @Field(type = FieldType.Text)
    private String time;
    /**
     * 经度
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/UPWarnMsgAdptInfo.java
@@ -50,6 +50,15 @@
    private int warnSrc;
    /**
     * 报警类型
     * 0x0001: 超速报警
     * 0x0002: 疲劳驾驶报警
     * 0x0003: 紧急报警
     * 0x0004: 进入指定区域报警
     * 0x0005: 离开指定区域报警
     * 0x0006: 路段堵塞报警
     * 0x0007: 危险路段报警
     * 0x0008: 越界报警
     * 0x0009: 盗警
     */
    @Field(type = FieldType.Integer)
    private int warnType;
@@ -82,6 +91,11 @@
     */
    @Field(type = FieldType.Integer)
    private int result;
    /**
     * 报警督办发起时间
     */
    @Field(type = FieldType.Long)
    private long pushTime;
    
    
    /**
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/enu/PositionAlarm.java
@@ -2,6 +2,7 @@
/**
 * 位置相关报警类型
 *
 * @author zhibing.pu
 * @Date 2025/3/3 18:45
 */
@@ -28,7 +29,6 @@
    private Integer code;
    
    private String name;
    
    PositionAlarm(Integer code, String name) {
        this.code = code;
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java
@@ -1,11 +1,16 @@
package com.ruoyi.dataInterchange.netty.client;
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder;
import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder;
import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -14,6 +19,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
@@ -22,6 +28,7 @@
 * @author zhibing.pu
 * @Date 2025/3/3 20:11
 */
@Slf4j
public class NettyClient {
    /**
     * 连接IP
@@ -44,7 +51,7 @@
     *
     * @throws Exception
     */
    public void start(int code) throws Exception {
    public void start(int inferiorPlatformId, int verifyCode) throws Exception {
        EventLoopGroup nioEventLoopGroup = null;
        try {
            //创建Bootstrap对象用来引导启动客户端
@@ -66,14 +73,32 @@
                            //  空闲检测处理器 触发空闲状态事件            读空闲:5秒      写空闲:7秒  读写空闲:10秒
                            pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS));
                            // 处理器
//                            pipeline.addLast("handler", new NettyHandle());
                            pipeline.addLast("handler", new NettyClientHandler());
                        }
                    });
            // • 调用Bootstrap.connect()来连接服务器
            ChannelFuture f = bootstrap.connect().sync();
            //将通道添加到缓存中,便于后期直接使用
            Channel channel = f.channel();
            ChannelMap.addClientChannel(code, channel);
            ChannelMap.addClientChannel(inferiorPlatformId, channel);
            log.info("从链路连接成功");
            //构建从链路请求
            DOWNConnectReq downConnectReq = new DOWNConnectReq();
            downConnectReq.setVerifyCode(verifyCode);
            log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
            byte[] body = downConnectReq.encode();
            OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
            channel.writeAndFlush(out);
            channel.flush();
            //缓存从链路地址
            UPConnect upConnect = new UPConnect();
            upConnect.setDownLinkIp(host);
            upConnect.setDownLinkPort(port);
            upConnect.setVerifyCode(verifyCode);
            ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
            // • 最后关闭EventLoopGroup来释放资源
            f.channel().closeFuture().sync();
        } finally {
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClientHandler.java
@@ -1,21 +1,58 @@
package com.ruoyi.dataInterchange.netty.client;
import io.netty.buffer.ByteBuf;
import com.ruoyi.common.core.utils.SpringUtils;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.server.*;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
 * @Date 2025/3/3 20:13
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Slf4j
@Component
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    
    public static List<ChannelHandlerContext> cts = new ArrayList<ChannelHandlerContext>();
    @Resource
    private ConnectReqService connectReqService = SpringUtils.getBean(ConnectReqService.class);
    @Resource
    private UPDisconnectReqService upDisconnectReqService = SpringUtils.getBean(UPDisconnectReqService.class);
    @Resource
    private UPLinkTestReqService upLinkTestReqService = SpringUtils.getBean(UPLinkTestReqService.class);
    @Resource
    private UPDisconnectInformService upDisconnectInformService = SpringUtils.getBean(UPDisconnectInformService.class);
    @Resource
    private UPCloseLinkInformService upCloseLinkInformService = SpringUtils.getBean(UPCloseLinkInformService.class);
    @Resource
    private DOWNConnectRspService downConnectRspService = SpringUtils.getBean(DOWNConnectRspService.class);
    @Resource
    private ExgMsgService exgMsgService = SpringUtils.getBean(ExgMsgService.class);
    @Resource
    private PlatformMsgService platformMsgService = SpringUtils.getBean(PlatformMsgService.class);
    @Resource
    private WarnMsgService warnMsgService = SpringUtils.getBean(WarnMsgService.class);
    @Resource
    private CtrlMsgService ctrlMsgService = SpringUtils.getBean(CtrlMsgService.class);
    @Resource
    private BaseMsgService baseMsgService = SpringUtils.getBean(BaseMsgService.class);
    
    
    /**
@@ -26,34 +63,24 @@
        System.err.println("客户端和服务端已建立连接");
    }
    
    public void write(ChannelHandlerContext ctx , String mess) throws Exception {
    public void write(ChannelHandlerContext ctx, String mess) throws Exception {
        String sendInfo = mess;
        ctx.writeAndFlush(Unpooled.copiedBuffer(sendInfo, CharsetUtil.UTF_8)); // 必须有flush
        ctx.flush();
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //读取数据
        //读取数据
        ByteBuf buf1 = (ByteBuf) msg;
        byte[] req = readClientData((ByteBuf) msg);
        String body = new String(req, "UTF-8"); //获取到的值
        System.out.println("客户端的数据------>"+body);
        //写数据
        write(ctx,"wits写的数据");
    public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception {
        OuterPacket outerPacket = (OuterPacket) object;
        int id = outerPacket.getId();
        serviceRouting(DataType.getDataType(id), ctx, outerPacket);
    }
    
    //将netty的数据装换为字节数组
    private byte[] readClientData(ByteBuf msg) {
        ByteBuf buf = msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        buf.release();
        return req;
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
    }
    
    /**
     * channelInactive
@@ -66,8 +93,45 @@
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cts.remove(ctx);
        ctx.close();
        System.out.println("异常退出:" + cause.getMessage());
    }
    /**
     * 业务路由
     *
     * @param dataType
     * @param ctx
     * @param out
     */
    public void serviceRouting(DataType dataType, ChannelHandlerContext ctx, OuterPacket out) {
        log.info("从链路信息交换响应({}):{}", dataType.getCode(), out);
        switch (dataType) {
            case UP_DISCONNECT_INFORM:
                log.info("主链路断开通知请求({}):{}", DataType.UP_DISCONNECT_INFORM.getCode(), out);
                upDisconnectInformService.disconnect(ctx, out);
                break;
            case UP_CLOSELINK_INFORM:
                log.info("下级平台主动关闭主从链路通知({}):{}", DataType.UP_CLOSELINK_INFORM.getCode(), out);
                upCloseLinkInformService.closeLinkInform(ctx, out);
                break;
            case DOWN_CONNECT_RSP:
                log.info("从链路连接应答({}):{}", DataType.DOWN_CONNECT_RSP.getCode(), out);
                downConnectRspService.connectRsp(ctx, out);
                break;
            case DOWN_DISCONNECT_RSP:
                log.info("从链路注销应答({}):{}", DataType.DOWN_DISCONNECT_RSP.getCode(), out);
                break;
            case DOWN_LINKTEST_RSP:
                log.info("从链路连接保持应答({}):{}", DataType.DOWN_LINKTEST_RSP.getCode(), out);
                break;
            case UP_MANAGE_TOTAL_RECV_BACK_MSG:
                log.info("发送车辆定位信息数量通知({}):{}", DataType.UP_MANAGE_TOTAL_RECV_BACK_MSG.getCode(), out);
                break;
            default:
                break;
        }
    }
}
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/server/NettyHandle.java
@@ -10,11 +10,10 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * 自定义handler
 *
 * @author zhibing.pu
 * @Date 2025/3/3 19:30
 */
@@ -56,7 +55,6 @@
    private BaseMsgService baseMsgService = SpringUtils.getBean(BaseMsgService.class);
    
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception {
        OuterPacket outerPacket = (OuterPacket) object;
@@ -77,7 +75,7 @@
        //数据头
        String head = msg.substring(0, 2); //--头标识
        String tail = msg.substring(msg.length() - 2);
        String datalength= msg.substring(2, 10);//--数据头->数据长度
        String datalength = msg.substring(2, 10);//--数据头->数据长度
        String dataSeqNo = msg.substring(10, 18);// --数据头->报文序列号
        String bizdata = msg.substring(18, 22);// --数据头->业务数据类型
        String code = msg.substring(22, 30); //--数据头->下级平台接入码,上级平台给下级平台分配唯一标识码
@@ -88,7 +86,7 @@
        
        //数据体
        String body = msg.substring(62, msg.length() - 2);
    }
    
    
@@ -113,19 +111,16 @@
    }
    
    
    /**
     * 业务路由
     *
     * @param dataType
     * @param ctx
     * @param out
     */
    public void serviceRouting(DataType dataType, ChannelHandlerContext ctx, OuterPacket out){
        switch (dataType){
    public void serviceRouting(DataType dataType, ChannelHandlerContext ctx, OuterPacket out) {
        log.info("主链路信息交换响应({}):{}", dataType.getCode(), out);
        switch (dataType) {
            case UP_CONNECT_REQ:
                log.info("主链路登录请求({}):{}", DataType.UP_CONNECT_REQ.getCode(), out);
                connectReqService.connect(ctx, out);
@@ -137,33 +132,6 @@
            case UP_LINKTEST_REQ:
                log.info("主链路连接保持请求({}):{}", DataType.UP_LINKTEST_REQ.getCode(), out);
                upLinkTestReqService.linkTest(ctx, out);
                break;
            case UP_DISCONNECT_INFORM:
                log.info("主链路断开通知请求({}):{}", DataType.UP_DISCONNECT_INFORM.getCode(), out);
                upDisconnectInformService.disconnect(ctx, out);
                break;
            case UP_CLOSELINK_INFORM:
                log.info("下级平台主动关闭主从链路通知({}):{}", DataType.UP_CLOSELINK_INFORM.getCode(), out);
                upCloseLinkInformService.closeLinkInform(ctx, out);
                break;
            case DOWN_CONNECT_RSP:
                log.info("从链路连接应答({}):{}", DataType.DOWN_CONNECT_RSP.getCode(), out);
                downConnectRspService.connectRsp(ctx, out);
                break;
            case DOWN_DISCONNECT_RSP:
                log.info("从链路注销应答({}):{}", DataType.DOWN_DISCONNECT_RSP.getCode(), out);
                break;
            case DOWN_LINKTEST_RSP:
                log.info("从链路连接保持应答({}):{}", DataType.DOWN_LINKTEST_RSP.getCode(), out);
                break;
            case UP_MANAGE_TOTAL_RECV_BACK_MSG:
                log.info("发送车辆定位信息数量通知({}):{}", DataType.UP_MANAGE_TOTAL_RECV_BACK_MSG.getCode(), out);
                break;
            case UP_MANAGE_MSG_RSP:
                log.info("平台链路连接情况与车辆定位消息传输情况上报应答消息({}):{}", DataType.UP_MANAGE_MSG_RSP.getCode(), out);
                break;
            case UP_MANAGE_MSG_SN_INFORM:
                log.info("上传平台间消息序列号通知({}):{}", DataType.UP_MANAGE_MSG_SN_INFORM.getCode(), out);
                break;
            case UP_EXG_MSG:
                log.info("主链路车辆动态信息交换({}):{}", DataType.UP_EXG_MSG.getCode(), out);
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/BaseMsgService.java
@@ -4,7 +4,10 @@
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
@@ -14,10 +17,18 @@
@Component
public class BaseMsgService {
    
    @Resource
    private RedisTemplate redisTemplate;
    
    public void up_base_msg(ChannelHandlerContext ctx, OuterPacket out){
    public void up_base_msg(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        DataType dataType = DataType.getDataType(out.getId());
        switch (dataType){
        switch (dataType) {
            case UP_BASE_MSG_VEHICLE_ADDED_ACK:
                log.info("补报车辆静态信息应答消息({}):{}", DataType.UP_BASE_MSG_VEHICLE_ADDED_ACK.getCode(), out);
                break;
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -1,7 +1,6 @@
package com.ruoyi.dataInterchange.server;
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.UPConnectRsp;
@@ -9,20 +8,20 @@
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.netty.client.NettyClient;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -35,6 +34,9 @@
    
    @Resource
    private EnterpriseClient enterpriseClient;
    @Resource
    private RedisTemplate redisTemplate;
    
    
    /**
@@ -74,10 +76,11 @@
        ctx.writeAndFlush(out);
        ctx.flush();
        if (upConnectRsp.getResult() == 0x00) {
            redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES);
            //保存链路
            ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
            //从链路连接
//            downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
            downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
        } else {
            ctx.close();
        }
@@ -94,9 +97,7 @@
    public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
            if (b) {
                downLinkTest(inferiorPlatformId);
            }
        } catch (Exception e) {
            downDisconnectInform(ctx, 0x00);
            throw new RuntimeException(e);
@@ -113,70 +114,24 @@
     */
    public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            new NettyClient(host, port).start(inferiorPlatformId);
            //构建从链路请求
            DOWNConnectReq downConnectReq = new DOWNConnectReq();
            downConnectReq.setVerifyCode(verifyCode);
            //获取从链路通道
            Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
            if (null != channel && channel.isActive()) {
                log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
                byte[] body = downConnectReq.encode();
                OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
                channel.writeAndFlush(out);
                channel.flush();
                //缓存从链路地址
                UPConnect upConnect = new UPConnect();
                upConnect.setDownLinkIp(host);
                upConnect.setDownLinkPort(port);
                upConnect.setVerifyCode(verifyCode);
                ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
                return true;
            }
            ExecutorService executorService = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        new NettyClient(host, port).start(inferiorPlatformId, verifyCode);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 从链路连接保持请求
     *
     * @param inferiorPlatformId
     */
    public void downLinkTest(int inferiorPlatformId) {
        //创建定时任务间隔发送链接保持请求
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //获取从链路通道
                Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
                if (null != channel && channel.isActive()) {
                    log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
                    OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
                    channel.writeAndFlush(out);
                    channel.flush();
                } else {
                    //记录失败次数,然后再重新连接
                    int times = ChannelMap.getTimes(inferiorPlatformId);
                    if (times >= 18) {
                        UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                        boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                        if (b) {
                            times = 0;
                        } else {
                            times++;
                        }
                    } else {
                        times++;
                    }
                    ChannelMap.saveTimes(inferiorPlatformId, times);
                }
            }
        }, 10, 10, TimeUnit.SECONDS);
    }
    
    
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/CtrlMsgService.java
@@ -1,7 +1,6 @@
package com.ruoyi.dataInterchange.server;
import com.ruoyi.dataInterchange.model.CtrlMag;
import com.ruoyi.dataInterchange.model.WarnMsg;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
@@ -9,7 +8,10 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
@@ -19,10 +21,19 @@
@Component
public class CtrlMsgService {
    
    public void up_ctrl_msg(ChannelHandlerContext ctx, OuterPacket out){
    @Resource
    private RedisTemplate redisTemplate;
    public void up_ctrl_msg(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        CtrlMag ctrlMsg = getCtrlMsg(out);
        DataType dataType = DataType.getDataType(ctrlMsg.getDataType());
        switch (dataType){
        switch (dataType) {
            case UP_CTRL_MSG_MONITOR_VEHICLE_ACK:
                log.info("车辆单向监听应答消息({}):{}", DataType.UP_CTRL_MSG_MONITOR_VEHICLE_ACK.getCode(), out);
                break;
@@ -46,6 +57,7 @@
    
    /**
     * 解析子业务数据
     *
     * @param out
     * @return
     */
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java
@@ -1,16 +1,29 @@
package com.ruoyi.dataInterchange.server;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 从链路连接应答
 *
 * @author zhibing.pu
 * @Date 2025/3/6 16:11
 */
@@ -18,15 +31,68 @@
@Component
public class DOWNConnectRspService {
    
    @Resource
    private RedisTemplate redisTemplate;
    @Resource
    private ConnectReqService connectReqService;
    private Map<Integer, ScheduledExecutorService> scheduledMap = new HashMap<>();
    
    /**
     * 从链路连接应答
     *
     * @param ctx
     * @param outerPacket
     */
    public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket){
    public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
        DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf);
        //从链路保持请求
        downLinkTest(outerPacket.getGnsscenterId());
    }
    /**
     * 从链路连接保持请求
     *
     * @param inferiorPlatformId
     */
    public void downLinkTest(int inferiorPlatformId) {
        ScheduledExecutorService scheduledExecutorService = scheduledMap.get(inferiorPlatformId);
        if (null == scheduledExecutorService) {
            //创建定时任务间隔发送链接保持请求
            scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    //获取从链路通道
                    Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
                    if (null != channel && channel.isActive()) {
                        OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
                        channel.writeAndFlush(out);
                        log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
                        redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
                    } else {
                        //记录失败次数,然后再重新连接
                        int times = ChannelMap.getTimes(inferiorPlatformId);
                        if (times >= 18) {
                            UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                            boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                            if (b) {
                                times = 0;
                            } else {
                                times++;
                            }
                        } else {
                            times++;
                        }
                        ChannelMap.saveTimes(inferiorPlatformId, times);
                    }
                }
            }, 10, 10, TimeUnit.SECONDS);
            scheduledMap.put(inferiorPlatformId, scheduledExecutorService);
        }
    }
}
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ExgMsgService.java
@@ -12,6 +12,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -45,10 +46,19 @@
    @Resource
    private UPExgMsgTakeEwayBillAckDao upExgMsgTakeEwayBillAckDao;
    
    @Resource
    private RedisTemplate redisTemplate;
    
    public void up_exg_msg(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        UPExgMsg exgMsg = getExgMsg(out);
        DataType dataType = DataType.getDataType(exgMsg.getDataType());
        log.info("主链路车辆动态信息交换子业务({}):{}", dataType.getCode(), out);
        switch (dataType) {
            case UP_EXG_MSG_REGISTER:
                log.info("上传车辆注册信息({}):{}", DataType.UP_EXG_MSG_REGISTER.getCode(), out);
@@ -152,15 +162,13 @@
        downExgMsgReportDriverInfo.setVehicleColor(VehicleColor);
        downExgMsgReportDriverInfo.setDataType(DataType.DOWN_EXG_MSG_REPORT_DRIVER_INFO.getCode());
        downExgMsgReportDriverInfo.setDataLength(0);
        log.info("上报驾驶员身份识别信息请求({}):{}", DataType.DOWN_EXG_MSG_REPORT_DRIVER_INFO.getCode(), JSON.toJSONString(downExgMsgReportDriverInfo));
        byte[] body = downExgMsgReportDriverInfo.encode();
        OuterPacket out = new OuterPacket(DataType.DOWN_EXG_MSG_REPORT_DRIVER_INFO.getCode(), body);
        //获取从链路通道
        Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
        if (null != channel && channel.isActive()) {
            channel.writeAndFlush(out);
        } else {
            ctx.writeAndFlush(out);
            log.info("上报驾驶员身份识别信息请求({}):{}", DataType.DOWN_EXG_MSG_REPORT_DRIVER_INFO.getCode(), JSON.toJSONString(downExgMsgReportDriverInfo));
        }
    }
    
@@ -196,22 +204,23 @@
     * @param inferiorPlatformId
     */
    public void down_exg_msg_take_ewaybill_req(int inferiorPlatformId, String vehicleNo, int VehicleColor) {
        if (!redisTemplate.hasKey("login:" + inferiorPlatformId)) {
            log.error("链路还未登录校验,拒绝连接");
            return;
        }
        DOWNExgMsgTakeEwaybillReq downExgMsgTakeEwaybillReq = new DOWNExgMsgTakeEwaybillReq();
        downExgMsgTakeEwaybillReq.setVehicleNo(vehicleNo);
        downExgMsgTakeEwaybillReq.setVehicleColor(VehicleColor);
        downExgMsgTakeEwaybillReq.setDataType(DataType.DOWN_EXG_MSG_REPORT_DRIVER_INFO.getCode());
        downExgMsgTakeEwaybillReq.setDataType(DataType.DOWN_EXG_MSG_TAKE_EWAYBILL_REQ.getCode());
        downExgMsgTakeEwaybillReq.setDataLength(0);
        
        log.info("上报车辆电子运单请求({}):{}", DataType.DOWN_EXG_MSG_TAKE_EWAYBILL_REQ.getCode(), JSON.toJSONString(downExgMsgTakeEwaybillReq));
        byte[] body = downExgMsgTakeEwaybillReq.encode();
        OuterPacket out = new OuterPacket(DataType.DOWN_EXG_MSG_TAKE_EWAYBILL_REQ.getCode(), body);
        //获取从链路通道
        Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
        if (null != channel && channel.isActive()) {
            channel.writeAndFlush(out);
        } else {
            channel = ChannelMap.getServerChannel(inferiorPlatformId);
            channel.writeAndFlush(out);
            log.info("上报车辆电子运单请求({}):{}", DataType.DOWN_EXG_MSG_TAKE_EWAYBILL_REQ.getCode(), JSON.toJSONString(downExgMsgTakeEwaybillReq));
        }
    }
    
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/PlatformMsgService.java
@@ -1,16 +1,16 @@
package com.ruoyi.dataInterchange.server;
import com.ruoyi.dataInterchange.model.UPExgMsg;
import com.ruoyi.dataInterchange.model.UPPlatformMsg;
import com.ruoyi.dataInterchange.model.UPPlatformMsgInfoAck;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
@@ -20,18 +20,24 @@
@Component
public class PlatformMsgService {
    
    public void up_platform_msg(ChannelHandlerContext ctx, OuterPacket out){
    @Resource
    private RedisTemplate redisTemplate;
    public void up_platform_msg(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        UPPlatformMsg platformMsg = getPlatformMsg(out);
        DataType dataType = DataType.getDataType(platformMsg.getDataType());
        switch (dataType){
        switch (dataType) {
            case UP_PLATFORM_MSG_POST_QUERY_ACK:
                log.info("平台查岗应答消息({}):{}", DataType.UP_PLATFORM_MSG_POST_QUERY_ACK.getCode(), platformMsg);
                break;
            case UP_PLATFORM_MSG_INFO_ACK:
                log.info("下发平台间报文应答消息({}):{}", DataType.UP_PLATFORM_MSG_INFO_ACK.getCode(), platformMsg);
                break;
            case UP_PLATFORM_MSG_RETRAN_REQ:
                log.info("上传平台间消息补传请求({}):{}", DataType.UP_PLATFORM_MSG_RETRAN_REQ.getCode(), platformMsg);
                break;
            default:
                break;
@@ -39,9 +45,9 @@
    }
    
    
    /**
     * 解析子业务数据
     *
     * @param out
     * @return
     */
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/UPDisconnectReqService.java
@@ -3,15 +3,14 @@
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.UPDisconnectReq;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -28,25 +27,29 @@
    @Resource
    private EnterpriseClient enterpriseClient;
    
    @Resource
    private RedisTemplate redisTemplate;
    
    /**
     * 主链路注销
     *
     * @param ctx
     * @param out
     */
    public void disconnect(ChannelHandlerContext ctx, OuterPacket out){
    public void disconnect(ChannelHandlerContext ctx, OuterPacket out) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(out.getBody());
        UPDisconnectReq disconnectReq = new UPDisconnectReq().decode(byteBuf);
        Enterprise data = enterpriseClient.getEnterprise(disconnectReq.getUserId() + "").getData();
        if(null != data){
            if(!data.getPassword().equals(disconnectReq.getPassword())){
        if (null != data) {
            if (!data.getPassword().equals(disconnectReq.getPassword())) {
                ctx.close();
                return;
            }
            OuterPacket rep = new OuterPacket(DataType.UP_DISCONNECT_RSP.getCode(), null);
            log.info("主链路注销应答({}):{}", DataType.UP_DISCONNECT_RSP.getCode(), JSON.toJSONString(rep));
            ctx.writeAndFlush(rep);
            ctx.flush();
            redisTemplate.delete("login:" + out.getGnsscenterId());
        }
    }
}
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/UPLinkTestReqService.java
@@ -6,7 +6,10 @@
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
@@ -17,6 +20,9 @@
@Component
public class UPLinkTestReqService {
    
    @Resource
    private RedisTemplate redisTemplate;
    
    /**
     * 主链路连接保持
@@ -25,11 +31,15 @@
     * @param out
     */
    public void linkTest(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        int gnsscenterId = out.getGnsscenterId();
        ChannelMap.addServerChannel(gnsscenterId, ctx.channel());
        OuterPacket rep = new OuterPacket(DataType.UP_LINKTEST_RSP.getCode(), null);
        log.info("主链路连接保持应答({}):{}", DataType.UP_LINKTEST_RSP.getCode(), JSON.toJSONString(rep));
        ctx.writeAndFlush(rep);
        ctx.flush();
    }
}
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
@@ -16,6 +16,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -40,8 +41,16 @@
    @Resource
    private UPWarnMsgAdptInfoDao upWarnMsgAdptInfoDao;
    
    @Resource
    private RedisTemplate redisTemplate;
    
    public void up_warn_msg(ChannelHandlerContext ctx, OuterPacket out) {
        if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
            log.error("链路还未登录校验,拒绝连接");
            ctx.close();
            return;
        }
        WarnMsg warnMsg = getWarnMsg(out);
        DataType dataType = DataType.getDataType(warnMsg.getDataType());
        switch (dataType) {
@@ -52,18 +61,6 @@
            case UP_WARN_MSG_ADPT_INFO:
                log.info("上报报警信息({}):{}", DataType.UP_WARN_MSG_ADPT_INFO.getCode(), out);
                up_warn_msg_adpt_info(ctx, out.getGnsscenterId(), warnMsg);
                break;
            case UP_CTRL_MSG_MONITOR_VEHICLE_ACK:
                log.info("车辆单向监听应答消息({}):{}", DataType.UP_CTRL_MSG_MONITOR_VEHICLE_ACK.getCode(), out);
                break;
            case UP_CTRL_MSG_TAKE_PHOTO_ACK:
                log.info("车辆牌照应答消息({}):{}", DataType.UP_CTRL_MSG_TAKE_PHOTO_ACK.getCode(), out);
                break;
            case UP_CTRL_MSG_TEXT_INFO_ACK:
                log.info("下发车辆报文应答消息({}):{}", DataType.UP_CTRL_MSG_TEXT_INFO_ACK.getCode(), out);
                break;
            case UP_CTRL_MSG_EMERGENCY_MONITORING_ACK:
                log.info("车辆应急接入监管平台应答消息({}):{}", DataType.UP_CTRL_MSG_EMERGENCY_MONITORING_ACK.getCode(), out);
                break;
            default:
                break;
@@ -135,7 +132,7 @@
     * 定时任务督办报警请求
     */
    public void taskUrgeTodo() {
        List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIs(0x00);
        List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIsAndPushTimeBefore(0x00, LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
        for (UPWarnMsgAdptInfo upWarnMsgAdptInfo : list) {
            down_warn_msg_urge_todo_req(upWarnMsgAdptInfo);
        }
@@ -146,19 +143,23 @@
     * 报警督办请求
     */
    public void down_warn_msg_urge_todo_req(UPWarnMsgAdptInfo upWarnMsgAdptInfo) {
        if (!redisTemplate.hasKey("login:" + upWarnMsgAdptInfo.getInferiorPlatformId())) {
            log.error("链路还未登录校验,拒绝连接");
            return;
        }
        int inferiorPlatformId = upWarnMsgAdptInfo.getInferiorPlatformId();
        DOWNWarnMsgUrgeTodoReq downWarnMsgUrgeTodoReq = new DOWNWarnMsgUrgeTodoReq().build(upWarnMsgAdptInfo);
        log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
        downWarnMsgUrgeTodoReq.setDataType(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode());
        downWarnMsgUrgeTodoReq.setDataLength(92);
        byte[] body = downWarnMsgUrgeTodoReq.encode();
        OuterPacket out = new OuterPacket(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), body);
        //获取从链路通道
        Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
        if (null != channel && channel.isActive()) {
            channel.writeAndFlush(out);
        } else {
            channel = ChannelMap.getServerChannel(inferiorPlatformId);
            channel.writeAndFlush(out);
            log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
        }
        upWarnMsgAdptInfo.setPushTime(LocalDateTime.now().plusDays(7).toEpochSecond(ZoneOffset.ofHours(8)));
        upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
    }
}