Pu Zhibing
2025-04-25 f8a708d9d960db750c9dd029efbe4a9f63d53fe8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.ruoyi.dataInterchange.netty.client;
 
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder;
import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder;
import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
 
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
 
/**
 * @author zhibing.pu
 * @Date 2025/3/3 20:11
 */
@Slf4j
public class NettyClient {
    /**
     * 连接IP
     */
    private String host;
    /**
     * 连接端口号
     */
    private int port;
    
    
    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    
    /**
     * 执行启动连接
     *
     * @throws Exception
     */
    public void start(int inferiorPlatformId, int verifyCode) throws Exception {
        EventLoopGroup nioEventLoopGroup = null;
        try {
            //创建Bootstrap对象用来引导启动客户端
            Bootstrap bootstrap = new Bootstrap();
            //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
            nioEventLoopGroup = new NioEventLoopGroup();
            //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址
            bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        //添加一个ChannelHandler,客户端成功连接服务器后就会被执行
                        @Override
                        protected void initChannel(SocketChannel socketChannel)
                                throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 解码器
                            pipeline.addLast("decoder", new Jtt809Decoder());
                            // 编码器
                            pipeline.addLast("encoder", new Jtt809Encoder());
                            //  空闲检测处理器 触发空闲状态事件            读空闲:5秒      写空闲:7秒  读写空闲:10秒
                            pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS));
                            // 处理器
                            pipeline.addLast("handler", new NettyClientHandler());
                        }
                    });
            // • 调用Bootstrap.connect()来连接服务器
            ChannelFuture f = bootstrap.connect().sync();
            //将通道添加到缓存中,便于后期直接使用
            Channel channel = f.channel();
            ChannelMap.addClientChannel(inferiorPlatformId, channel);
            log.info("从链路连接成功");
            
            //构建从链路请求
            DOWNConnectReq downConnectReq = new DOWNConnectReq();
            downConnectReq.setVerifyCode(verifyCode);
            log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
            byte[] body = downConnectReq.encode();
            OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body);
            channel.writeAndFlush(out);
            channel.flush();
            
            //缓存从链路地址
            UPConnect upConnect = new UPConnect();
            upConnect.setDownLinkIp(host);
            upConnect.setDownLinkPort(port);
            upConnect.setVerifyCode(verifyCode);
            ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
            
            // • 最后关闭EventLoopGroup来释放资源
            f.channel().closeFuture().sync();
        } finally {
            nioEventLoopGroup.shutdownGracefully().sync();
        }
    }
    
    public static void main(String[] args) {
        byte[] bytess = ByteArrayUtil.hexStr2Bytes("5B0000001F0000002B1002010A66F80100000000000000000000200C8E075D");
        ByteBuf msg = Unpooled.wrappedBuffer(bytess);
        
        byte[] readableBytes = new byte[msg.readableBytes()];
        msg.readBytes(readableBytes);
        // 反转义处理
        byte[] bytes = Jtt809Util.unescape(readableBytes);
        ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
        // 判断包头
        if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) {
            byteBuf.resetReaderIndex();
            return;
        }
        // crc校验
//        if (!Jtt809Util.validate(byteBuf)) {
//            return;
//        }
        
        /* 解析外层包 */
        // 长度
        long length = byteBuf.readUnsignedInt();
        // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比;
        // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等
//        if (length != bytes.length) {
//            log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length);
//            return;
//        }
        // 报文序列号
        long sn = byteBuf.readUnsignedInt();
        // 业务数据类型
        int id = byteBuf.readUnsignedShort();
        // 下级平台接入码
        int gnsscenterId = byteBuf.readInt();
        // 协议版本号标识
        String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte();
        // 报文加密标识位
        byte encryptFlag = byteBuf.readByte();
        // 数据加密解密的密匙
        long encryptKey = byteBuf.readUnsignedInt();
        // 2019版
//        String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " +
//                byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte();
//        long time = 0;
//        try {
//            time = DateUtils.parseDate(date, "dd-MM-yyyy HH:mm:ss").getTime();
//        } catch (ParseException e) {
//            log.warn("日期 [{}] 解析错误", date);
//        }
        
        // 消息体
        byte[] body;
        if (encryptFlag == 1) {
            byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2];
            byteBuf.readBytes(encryptedBytes);
            // 解密
            int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId);
            Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes);
            body = encryptedBytes;
        } else {
            body = new byte[byteBuf.readableBytes() - 2];
            byteBuf.readBytes(body);
        }
        ByteBuf buf = Unpooled.wrappedBuffer(body);
        int result = buf.readByte();
        int verifyCode = buf.readInt();
        // 校验码
        int crcCode = byteBuf.readUnsignedShort();
        //{"result":0,"verifyCode":658}
    }
}