Pu Zhibing
41 分钟以前 65a12fccec8ad0306817eb057adb102c54f2046f
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java
@@ -1,24 +1,34 @@
package com.ruoyi.dataInterchange.netty.client;
import com.ruoyi.dataInterchange.netty.server.NettyHandle;
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder;
import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder;
import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * @author zhibing.pu
 * @Date 2025/3/3 20:11
 */
@Slf4j
public class NettyClient {
   /**
    * 连接IP
@@ -30,8 +40,6 @@
   private int port;
   
   
   public NettyClient(String host, int port) {
      this.host = host;
      this.port = port;
@@ -40,9 +48,10 @@
   
   /**
    * 执行启动连接
    *
    * @throws Exception
    */
   public void start(int code) throws Exception {
   public void start(int inferiorPlatformId, int verifyCode) throws Exception {
      EventLoopGroup nioEventLoopGroup = null;
      try {
         //创建Bootstrap对象用来引导启动客户端
@@ -62,16 +71,34 @@
                     // 编码器
                     pipeline.addLast("encoder", new Jtt809Encoder());
                     //  空闲检测处理器 触发空闲状态事件            读空闲:5秒      写空闲:7秒  读写空闲:10秒
                     pipeline.addLast(new IdleStateHandler(5,7,3, TimeUnit.SECONDS));
                     pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS));
                     // 处理器
                     pipeline.addLast("handler", new NettyHandle());
                     pipeline.addLast("handler", new NettyClientHandler());
                  }
               });
         // • 调用Bootstrap.connect()来连接服务器
         ChannelFuture f = bootstrap.connect().sync();
         //将通道添加到缓存中,便于后期直接使用
         Channel channel = f.channel();
         ChannelMap.addClientChannel(code, channel.id());
         ChannelMap.addClientChannel(inferiorPlatformId, channel);
         log.info("从链路连接成功");
         //构建从链路请求
         DOWNConnectReq downConnectReq = new DOWNConnectReq();
         downConnectReq.setVerifyCode(verifyCode);
         log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
         byte[] body = downConnectReq.encode();
         OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body);
         channel.writeAndFlush(out);
         channel.flush();
         //缓存从链路地址
         UPConnect upConnect = new UPConnect();
         upConnect.setDownLinkIp(host);
         upConnect.setDownLinkPort(port);
         upConnect.setVerifyCode(verifyCode);
         ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
         // • 最后关闭EventLoopGroup来释放资源
         f.channel().closeFuture().sync();
      } finally {
@@ -80,10 +107,73 @@
   }
   
   public static void main(String[] args) {
      try {
         new NettyClient("221.182.45.100", 1000).start(1);
      } catch (Exception e) {
         throw new RuntimeException(e);
      byte[] bytess = ByteArrayUtil.hexStr2Bytes("5B0000001F0000002B1002010A66F80100000000000000000000200C8E075D");
      ByteBuf msg = Unpooled.wrappedBuffer(bytess);
      byte[] readableBytes = new byte[msg.readableBytes()];
      msg.readBytes(readableBytes);
      // 反转义处理
      byte[] bytes = Jtt809Util.unescape(readableBytes);
      ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
      // 判断包头
      if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) {
         byteBuf.resetReaderIndex();
         return;
      }
      // crc校验
//        if (!Jtt809Util.validate(byteBuf)) {
//            return;
//        }
      /* 解析外层包 */
      // 长度
      long length = byteBuf.readUnsignedInt();
      // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比;
      // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等
//        if (length != bytes.length) {
//            log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length);
//            return;
//        }
      // 报文序列号
      long sn = byteBuf.readUnsignedInt();
      // 业务数据类型
      int id = byteBuf.readUnsignedShort();
      // 下级平台接入码
      int gnsscenterId = byteBuf.readInt();
      // 协议版本号标识
      String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte();
      // 报文加密标识位
      byte encryptFlag = byteBuf.readByte();
      // 数据加密解密的密匙
      long encryptKey = byteBuf.readUnsignedInt();
      // 2019版
//        String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " +
//                byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte();
//        long time = 0;
//        try {
//            time = DateUtils.parseDate(date, "dd-MM-yyyy HH:mm:ss").getTime();
//        } catch (ParseException e) {
//            log.warn("日期 [{}] 解析错误", date);
//        }
      // 消息体
      byte[] body;
      if (encryptFlag == 1) {
         byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2];
         byteBuf.readBytes(encryptedBytes);
         // 解密
         int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId);
         Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes);
         body = encryptedBytes;
      } else {
         body = new byte[byteBuf.readableBytes() - 2];
         byteBuf.readBytes(body);
      }
      ByteBuf buf = Unpooled.wrappedBuffer(body);
      int result = buf.readByte();
      int verifyCode = buf.readInt();
      // 校验码
      int crcCode = byteBuf.readUnsignedShort();
      //{"result":0,"verifyCode":658}
   }
}