From 4ad32c37e3317fb6cb7cbfb034b53d3220c27db4 Mon Sep 17 00:00:00 2001 From: puzhibing <393733352@qq.com> Date: 星期五, 30 五月 2025 17:59:16 +0800 Subject: [PATCH] 优化报警 --- ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 103 insertions(+), 13 deletions(-) diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java index 0237e60..73289b8 100644 --- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java +++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java @@ -1,24 +1,34 @@ package com.ruoyi.dataInterchange.netty.client; -import com.ruoyi.dataInterchange.netty.server.NettyHandle; +import com.alibaba.fastjson.JSON; +import com.ruoyi.dataInterchange.model.DOWNConnectReq; +import com.ruoyi.dataInterchange.model.enu.DataType; +import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil; +import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; +import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder; import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder; +import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService; +import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; +import com.ruoyi.dataInterchange.wapper.UPConnect; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; /** * @author zhibing.pu * @Date 2025/3/3 20:11 */ +@Slf4j public class NettyClient { /** * 连接IP @@ -30,8 +40,6 @@ private int port; - - public NettyClient(String host, int port) { this.host = host; this.port = port; @@ -40,9 +48,10 @@ /** * 执行启动连接 + * * @throws Exception */ - public void start(int code) throws Exception { + public void start(int inferiorPlatformId, int verifyCode) throws Exception { EventLoopGroup nioEventLoopGroup = null; try { //创建Bootstrap对象用来引导启动客户端 @@ -62,16 +71,34 @@ // 编码器 pipeline.addLast("encoder", new Jtt809Encoder()); // 空闲检测处理器 触发空闲状态事件 读空闲:5秒 写空闲:7秒 读写空闲:10秒 - pipeline.addLast(new IdleStateHandler(5,7,3, TimeUnit.SECONDS)); + pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS)); // 处理器 - pipeline.addLast("handler", new NettyHandle()); + pipeline.addLast("handler", new NettyClientHandler()); } }); // • 调用Bootstrap.connect()来连接服务器 ChannelFuture f = bootstrap.connect().sync(); //将通道添加到缓存中,便于后期直接使用 Channel channel = f.channel(); - ChannelMap.addClientChannel(code, channel.id()); + ChannelMap.addClientChannel(inferiorPlatformId, channel); + log.info("从链路连接成功"); + + //构建从链路请求 + DOWNConnectReq downConnectReq = new DOWNConnectReq(); + downConnectReq.setVerifyCode(verifyCode); + log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq)); + byte[] body = downConnectReq.encode(); + OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body); + channel.writeAndFlush(out); + channel.flush(); + + //缓存从链路地址 + UPConnect upConnect = new UPConnect(); + upConnect.setDownLinkIp(host); + upConnect.setDownLinkPort(port); + upConnect.setVerifyCode(verifyCode); + ChannelMap.addIpAndPort(inferiorPlatformId, upConnect); + // • 最后关闭EventLoopGroup来释放资源 f.channel().closeFuture().sync(); } finally { @@ -80,10 +107,73 @@ } public static void main(String[] args) { - try { - new NettyClient("221.182.45.100", 1000).start(1); - } catch (Exception e) { - throw new RuntimeException(e); + byte[] bytess = ByteArrayUtil.hexStr2Bytes("5B0000001F0000002B1002010A66F80100000000000000000000200C8E075D"); + ByteBuf msg = Unpooled.wrappedBuffer(bytess); + + byte[] readableBytes = new byte[msg.readableBytes()]; + msg.readBytes(readableBytes); + // 反转义处理 + byte[] bytes = Jtt809Util.unescape(readableBytes); + ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); + // 判断包头 + if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) { + byteBuf.resetReaderIndex(); + return; } + // crc校验 +// if (!Jtt809Util.validate(byteBuf)) { +// return; +// } + + /* 解析外层包 */ + // 长度 + long length = byteBuf.readUnsignedInt(); + // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; + // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 +// if (length != bytes.length) { +// log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); +// return; +// } + // 报文序列号 + long sn = byteBuf.readUnsignedInt(); + // 业务数据类型 + int id = byteBuf.readUnsignedShort(); + // 下级平台接入码 + int gnsscenterId = byteBuf.readInt(); + // 协议版本号标识 + String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte(); + // 报文加密标识位 + byte encryptFlag = byteBuf.readByte(); + // 数据加密解密的密匙 + long encryptKey = byteBuf.readUnsignedInt(); + // 2019版 +// String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " + +// byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte(); +// long time = 0; +// try { +// time = DateUtils.parseDate(date, "dd-MM-yyyy HH:mm:ss").getTime(); +// } catch (ParseException e) { +// log.warn("日期 [{}] 解析错误", date); +// } + + // 消息体 + byte[] body; + if (encryptFlag == 1) { + byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2]; + byteBuf.readBytes(encryptedBytes); + // 解密 + int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId); + Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes); + body = encryptedBytes; + } else { + body = new byte[byteBuf.readableBytes() - 2]; + byteBuf.readBytes(body); + } + ByteBuf buf = Unpooled.wrappedBuffer(body); + int result = buf.readByte(); + int verifyCode = buf.readInt(); + // 校验码 + int crcCode = byteBuf.readUnsignedShort(); + //{"result":0,"verifyCode":658} } } -- Gitblit v1.7.1