ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/RuoYiDataInterchangeApplication.java
@@ -6,12 +6,12 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; /** * 账户模块 * * @author ruoyi */ @EnableCustomConfig @@ -22,8 +22,10 @@ @SpringBootApplication @EnableTransactionManagement//开启事务 public class RuoYiDataInterchangeApplication { public static void main(String[] args) { SpringApplication.run(RuoYiDataInterchangeApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 数据交互模块启动成功 ლ(´ڡ`ლ)゙ "); } public static void main(String[] args) { SpringApplication.run(RuoYiDataInterchangeApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 数据交互模块启动成功 ლ(´ڡ`ლ)゙ "); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/dao/UPWarnMsgAdptInfoDao.java
@@ -4,10 +4,21 @@ import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; import java.util.List; /** * @author zhibing.pu * @Date 2025/3/7 19:30 */ @Repository public interface UPWarnMsgAdptInfoDao extends ElasticsearchRepository<UPWarnMsgAdptInfo, Long> { /** * 根据处理状态查询数据 * * @param result * @return */ List<UPWarnMsgAdptInfo> findByResultIs(int result); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNConnectReq.java
@@ -19,7 +19,7 @@ private int verifyCode; public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(1); ByteBuf byteBuf = Unpooled.buffer(4); byteBuf.writeInt(this.getVerifyCode()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNDisconnectInform.java
@@ -1,6 +1,5 @@ package com.ruoyi.dataInterchange.model; import com.fasterxml.jackson.annotation.JsonProperty; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -8,6 +7,7 @@ /** * 从链路断开通知 * * @author zhibing.pu * @Date 2025/2/24 11:12 */ @@ -24,7 +24,7 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(1); byteBuf.writeInt(this.getErrorCode()); byteBuf.writeByte(this.getErrorCode()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); return bytes; ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNExgMsgReportDriverInfo.java
@@ -37,11 +37,15 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(28); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeByte(this.getDataType()); byteBuf.writeShort(this.getDataType()); byteBuf.writeInt(this.getDataLength()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNExgMsgTakeEwaybillReq.java
@@ -7,6 +7,7 @@ /** * 上报车辆电子运单请求 * * @author zhibing.pu * @Date 2025/3/3 11:06 */ @@ -36,11 +37,15 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(28); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeByte(this.getDataType()); byteBuf.writeShort(this.getDataType()); byteBuf.writeInt(this.getDataLength()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNWarnMsgUrgeTodoReq.java
@@ -100,8 +100,12 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(120); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeShort(this.getDataType()); @@ -113,16 +117,28 @@ byteBuf.writeLong(this.getSupervisionEndTime()); byteBuf.writeByte(this.getSupervisionLevel()); byte[] bytes2 = this.getSupervisor().getBytes(); for (byte b : bytes2) { byteBuf.writeByte(b); for (int i = 0; i < 16; i++) { if (i < bytes2.length) { byteBuf.writeByte(bytes2[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes3 = this.getSupervisorTel().getBytes(); for (byte b : bytes3) { byteBuf.writeByte(b); for (int i = 0; i < 20; i++) { if (i < bytes3.length) { byteBuf.writeByte(bytes3[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes4 = this.getSupervisorEmail().getBytes(); for (byte b : bytes4) { byteBuf.writeByte(b); for (int i = 0; i < 32; i++) { if (i < bytes4.length) { byteBuf.writeByte(bytes4[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/UPWarnMsgAdptInfo.java
@@ -75,6 +75,15 @@ */ @Field(type = FieldType.Text) private String infoContent; /** * 报警处理结果 * 0x00:处理中 * 0x01:已处理完毕 * 0x02:不作处理 * 0x03:将来处理 */ @Field(type = FieldType.Integer) private int result; /** ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java
@@ -1,9 +1,14 @@ package com.ruoyi.dataInterchange.netty.client; import com.ruoyi.dataInterchange.netty.server.NettyHandle; import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder; import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder; import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -11,8 +16,6 @@ import io.netty.handler.timeout.IdleStateHandler; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -30,8 +33,6 @@ private int port; public NettyClient(String host, int port) { this.host = host; this.port = port; @@ -40,6 +41,7 @@ /** * 执行启动连接 * * @throws Exception */ public void start(int code) throws Exception { @@ -62,9 +64,9 @@ // 编码器 pipeline.addLast("encoder", new Jtt809Encoder()); // 空闲检测处理器 触发空闲状态事件 读空闲:5秒 写空闲:7秒 读写空闲:10秒 pipeline.addLast(new IdleStateHandler(5,7,3, TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS)); // 处理器 pipeline.addLast("handler", new NettyHandle()); // pipeline.addLast("handler", new NettyHandle()); } }); // • 调用Bootstrap.connect()来连接服务器 @@ -80,10 +82,73 @@ } public static void main(String[] args) { try { new NettyClient("221.182.45.100", 1000).start(1); } catch (Exception e) { throw new RuntimeException(e); byte[] bytess = ByteArrayUtil.hexStr2Bytes("5B0000001F0000002B1002010A66F80100000000000000000000200C8E075D"); ByteBuf msg = Unpooled.wrappedBuffer(bytess); byte[] readableBytes = new byte[msg.readableBytes()]; msg.readBytes(readableBytes); // 反转义处理 byte[] bytes = Jtt809Util.unescape(readableBytes); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); // 判断包头 if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) { byteBuf.resetReaderIndex(); return; } // crc校验 // if (!Jtt809Util.validate(byteBuf)) { // return; // } /* 解析外层包 */ // 长度 long length = byteBuf.readUnsignedInt(); // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 // if (length != bytes.length) { // log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); // return; // } // 报文序列号 long sn = byteBuf.readUnsignedInt(); // 业务数据类型 int id = byteBuf.readUnsignedShort(); // 下级平台接入码 int gnsscenterId = byteBuf.readInt(); // 协议版本号标识 String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte(); // 报文加密标识位 byte encryptFlag = byteBuf.readByte(); // 数据加密解密的密匙 long encryptKey = byteBuf.readUnsignedInt(); // 2019版 // String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " + // byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte(); // long time = 0; // try { // time = DateUtils.parseDate(date, "dd-MM-yyyy HH:mm:ss").getTime(); // } catch (ParseException e) { // log.warn("日期 [{}] 解析错误", date); // } // 消息体 byte[] body; if (encryptFlag == 1) { byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(encryptedBytes); // 解密 int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId); Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes); body = encryptedBytes; } else { body = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(body); } ByteBuf buf = Unpooled.wrappedBuffer(body); int result = buf.readByte(); int verifyCode = buf.readInt(); // 校验码 int crcCode = byteBuf.readUnsignedShort(); //{"result":0,"verifyCode":658} } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -75,8 +75,8 @@ if (upConnectRsp.getResult() == 0x00) { //保存链路 ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel().id()); //从链路连接 downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); //从链路连接 TODO 临时注释掉 // downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); } else { ctx.close(); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
@@ -21,7 +21,6 @@ import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; /** @@ -125,6 +124,7 @@ */ public void up_warn_msg_adpt_info(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) { UPWarnMsgAdptInfo upWarnMsgAdptInfo = new UPWarnMsgAdptInfo().decode(warnMsg); upWarnMsgAdptInfo.setResult(0x00); upWarnMsgAdptInfo.setInferiorPlatformId(inferiorPlatformId); upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now()); upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo); @@ -135,8 +135,7 @@ * 定时任务督办报警请求 */ public void taskUrgeTodo() { upWarnMsgAdptInfoDao.findAll() List<UPWarnMsgAdptInfo> list = new ArrayList<>(); List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIs(0x00); for (UPWarnMsgAdptInfo upWarnMsgAdptInfo : list) { down_warn_msg_urge_todo_req(upWarnMsgAdptInfo); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/decoder/Jtt809Decoder.java
@@ -4,16 +4,15 @@ import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;import io.netty.buffer.ByteBuf; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; import java.text.ParseException; import java.util.List; /** @@ -21,54 +20,54 @@ */ @Slf4j public class Jtt809Decoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { // 写个判断,线上环境就不需要执行 ByteBufUtil.hexDump if (log.isDebugEnabled()) { log.debug("收到一条消息:{}5d", ByteBufUtil.hexDump(msg)); } byte[] readableBytes = new byte[msg.readableBytes()]; msg.readBytes(readableBytes); log.info("接收到数据包, packetLen : {}, packet : {}", readableBytes.length, ByteArrayUtil.bytes2HexStr(readableBytes)); // 反转义处理 byte[] bytes = Jtt809Util.unescape(readableBytes); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); log.info("反转义后数据包, packetLen : {}, packet : {}", bytes.length, ByteArrayUtil.bytes2HexStr(bytes)); // 判断包头 if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) { byteBuf.resetReaderIndex(); log.warn("消息包头错误: {}5d", ByteBufUtil.hexDump(byteBuf)); return; } // crc校验 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { // 写个判断,线上环境就不需要执行 ByteBufUtil.hexDump if (log.isDebugEnabled()) { log.debug("收到一条消息:{}5d", ByteBufUtil.hexDump(msg)); } byte[] readableBytes = new byte[msg.readableBytes()]; msg.readBytes(readableBytes); log.info("接收到数据包, packetLen : {}, packet : {}", readableBytes.length, ByteArrayUtil.bytes2HexStr(readableBytes)); // 反转义处理 byte[] bytes = Jtt809Util.unescape(readableBytes); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); log.info("反转义后数据包, packetLen : {}, packet : {}", bytes.length, ByteArrayUtil.bytes2HexStr(bytes)); // 判断包头 if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) { byteBuf.resetReaderIndex(); log.warn("消息包头错误: {}5d", ByteBufUtil.hexDump(byteBuf)); return; } // crc校验 // if (!Jtt809Util.validate(byteBuf)) { // return; // } /* 解析外层包 */ // 长度 long length = byteBuf.readUnsignedInt(); // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 if (length != bytes.length) { log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); return; } // 报文序列号 long sn = byteBuf.readUnsignedInt(); // 业务数据类型 int id = byteBuf.readUnsignedShort(); // 下级平台接入码 int gnsscenterId = byteBuf.readInt(); ctx.channel().attr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID).setIfAbsent(String.valueOf(gnsscenterId)); // 协议版本号标识 String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte(); // 报文加密标识位 byte encryptFlag = byteBuf.readByte(); // 数据加密解密的密匙 long encryptKey = byteBuf.readUnsignedInt(); // 2019版 /* 解析外层包 */ // 长度 long length = byteBuf.readUnsignedInt(); // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 // if (length != bytes.length) { // log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); // return; // } // 报文序列号 long sn = byteBuf.readUnsignedInt(); // 业务数据类型 int id = byteBuf.readUnsignedShort(); // 下级平台接入码 int gnsscenterId = byteBuf.readInt(); ctx.channel().attr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID).setIfAbsent(String.valueOf(gnsscenterId)); // 协议版本号标识 String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte(); // 报文加密标识位 byte encryptFlag = byteBuf.readByte(); // 数据加密解密的密匙 long encryptKey = byteBuf.readUnsignedInt(); // 2019版 // String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " + // byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte(); // long time = 0; @@ -77,24 +76,25 @@ // } catch (ParseException e) { // log.warn("日期 [{}] 解析错误", date); // } // 消息体 byte[] body; if (encryptFlag == 1) { byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(encryptedBytes); // 解密 int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId); Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes); body = encryptedBytes; } else { body = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(body); } // 校验码 int crcCode = byteBuf.readUnsignedShort(); ReferenceCountUtil.release(byteBuf); out.add(new OuterPacket(length, sn, id, gnsscenterId, version, encryptFlag, encryptKey, body, crcCode)); } // 消息体 byte[] body; if (encryptFlag == 1) { byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(encryptedBytes); // 解密 int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId); Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes); body = encryptedBytes; } else { body = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(body); } // 校验码 int crcCode = byteBuf.readUnsignedShort(); ReferenceCountUtil.release(byteBuf); out.add(new OuterPacket(length, sn, id, gnsscenterId, version, encryptFlag, encryptKey, body, crcCode)); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/encoder/Jtt809Encoder.java
@@ -1,5 +1,6 @@ package com.ruoyi.dataInterchange.util.jtt809.encoder; import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil; import com.ruoyi.dataInterchange.util.jtt809.common.CRC16CCITT; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; @@ -8,71 +9,72 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.ZoneOffset; import lombok.extern.slf4j.Slf4j; /** * @author tucke */ @Slf4j public class Jtt809Encoder extends MessageToByteEncoder<OuterPacket> { @Override protected void encode(ChannelHandlerContext ctx, OuterPacket packet, ByteBuf out) throws Exception { if (packet == null) { return; } int gnsscenterId; if (ctx.channel().hasAttr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID)) { gnsscenterId = Integer.parseInt(ctx.channel().attr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID).get()); } else { gnsscenterId = packet.getGnsscenterId(); } byte[] body = packet.getBody(); if (body == null) { body = new byte[0]; } // 32 = 头标识[1] + 数据头[30 = 长度[4] + 序列号[4] + 数据类型[2] + 接入码[4] + 版本号[3] + 加密标识[1] + 密钥[4]] + 时间[8] + 尾标识[1] int len = body.length + 32; out.markReaderIndex(); // 数据长度 out.writeInt(len); // 序列号 out.writeInt(GnssCenterService.getInstance().serialNo(gnsscenterId)); // 业务数据类型 out.writeShort(packet.getId()); // 下级平台接入码 out.writeInt(gnsscenterId); // 版本号 out.writeByte(1); out.writeByte(0); out.writeByte(0); // 报文加密标识位 out.writeByte(0); // 数据加密的密钥 out.writeInt(0); // 数据体 out.writeBytes(body); // 校验码 byte[] crcBytes = new byte[out.readableBytes()]; out.readBytes(crcBytes); out.writeShort(CRC16CCITT.crc16(crcBytes)); // 转义 out.resetReaderIndex(); byte[] escapeBytes = new byte[out.readableBytes()]; out.readBytes(escapeBytes); // 重置下标 out.setIndex(0, 0); // 包头标识 out.writeByte(Jtt809Constant.PACKET_HEAD_FLAG); // 数据内容 out.writeBytes(Jtt809Util.escape(escapeBytes)); // 包尾标识 out.writeByte(Jtt809Constant.PACKET_END_FLAG); } @Override protected void encode(ChannelHandlerContext ctx, OuterPacket packet, ByteBuf out) throws Exception { if (packet == null) { return; } int gnsscenterId; if (ctx.channel().hasAttr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID)) { gnsscenterId = Integer.parseInt(ctx.channel().attr(Jtt809Constant.NettyAttribute.GNSS_CENTER_ID).get()); } else { gnsscenterId = packet.getGnsscenterId(); } byte[] body = packet.getBody(); if (body == null) { body = new byte[0]; } // 26 = 头标识[1] + 数据头[22 = 长度[4] + 序列号[4] + 数据类型[2] + 接入码[4] + 版本号[3] + 加密标识[1] + 密钥[4]] + 校验码[2] + 尾标识[1] int len = body.length + 26; out.markReaderIndex(); // 数据长度 out.writeInt(len); // 序列号 out.writeInt(GnssCenterService.getInstance().serialNo(gnsscenterId)); // 业务数据类型 out.writeShort(packet.getId()); // 下级平台接入码 out.writeInt(gnsscenterId); // 版本号 out.writeByte(1); out.writeByte(0); out.writeByte(0); // 报文加密标识位 out.writeByte(0); // 数据加密的密钥 out.writeInt(0); // 数据体 out.writeBytes(body); // 校验码 byte[] crcBytes = new byte[out.readableBytes()]; out.readBytes(crcBytes); out.writeShort(CRC16CCITT.crc16(crcBytes)); // 转义 out.resetReaderIndex(); byte[] escapeBytes = new byte[out.readableBytes()]; out.readBytes(escapeBytes); // 重置下标 out.setIndex(0, 0); // 包头标识 out.writeByte(Jtt809Constant.PACKET_HEAD_FLAG); // 数据内容 out.writeBytes(Jtt809Util.escape(escapeBytes)); // 包尾标识 out.writeByte(Jtt809Constant.PACKET_END_FLAG); byte[] readableBytes = new byte[out.readableBytes()]; out.readBytes(readableBytes); log.info("下发数据包, packetLen : {}, packet : {}", readableBytes.length, ByteArrayUtil.bytes2HexStr(readableBytes)); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/packet/common/OuterPacket.java
@@ -9,65 +9,64 @@ /** * @author tucke */ @SuppressWarnings("SpellCheckingInspection") @AllArgsConstructor @NoArgsConstructor @Data public class OuterPacket implements Serializable { private static final long serialVersionUID = 1L; /** * 数据长度(包括头标识、数据头、数据体和尾标识) */ private long length; /** * 报文序列号 * 占用四个字节,为发送信息的序列号,用于接收方检测是否有信息的丢失,上级平台和下级平台接自己发送数据包的个数计数,互不影响。 * 程序开始运行时等于零,发送第一帧数据时开始计数,到最大数后自动归零 */ private long sn; /** * 业务数据类型 */ private int id; /** * 下级平台接入码,上级平台给下级平台分配唯一标识码 */ private int gnsscenterId; /** * 协议版本号标识,上下级平台之间采用的标准协议版编号 * 长度为 3 个字节来表示,0x01 0x02 0x0F 表示的版本号是 v1.2.15,以此类推 */ private String version; /** * 报文加密标识位 * 0 - 报文不加密 * 1 - 报文加密, 后继相应业务的数据体采用 ENCRYPT_KEY 对应的密钥进行加密处理 */ private byte encryptFlag; /** * 数据加密解密的密匙,长度为 4 个字节 */ private long encryptKey; /** * 消息体 */ private byte[] body; /** * 数据 CRC 校验码 */ private int crcCode; public OuterPacket(int id, byte[] body) { this.id = id; this.body = body; } public OuterPacket(int id, int gnsscenterId, byte[] body) { this.id = id; this.gnsscenterId = gnsscenterId; this.body = body; } private static final long serialVersionUID = 1L; /** * 数据长度(包括头标识、数据头、数据体和尾标识) */ private long length; /** * 报文序列号 * 占用四个字节,为发送信息的序列号,用于接收方检测是否有信息的丢失,上级平台和下级平台接自己发送数据包的个数计数,互不影响。 * 程序开始运行时等于零,发送第一帧数据时开始计数,到最大数后自动归零 */ private long sn; /** * 业务数据类型 */ private int id; /** * 下级平台接入码,上级平台给下级平台分配唯一标识码 */ private int gnsscenterId; /** * 协议版本号标识,上下级平台之间采用的标准协议版编号 * 长度为 3 个字节来表示,0x01 0x02 0x0F 表示的版本号是 v1.2.15,以此类推 */ private String version; /** * 报文加密标识位 * 0 - 报文不加密 * 1 - 报文加密, 后继相应业务的数据体采用 ENCRYPT_KEY 对应的密钥进行加密处理 */ private byte encryptFlag; /** * 数据加密解密的密匙,长度为 4 个字节 */ private long encryptKey; /** * 消息体 */ private byte[] body; /** * 数据 CRC 校验码 */ private int crcCode; public OuterPacket(int id, byte[] body) { this.id = id; this.body = body; } public OuterPacket(int id, int gnsscenterId, byte[] body) { this.id = id; this.gnsscenterId = gnsscenterId; this.body = body; } } ruoyi-service/ruoyi-dataInterchange/src/test/java/com/ruoyi/dataInterchange/RuoYiOrderApplicationTests.java
New file @@ -0,0 +1,79 @@ //package com.ruoyi.dataInterchange; // //import com.ruoyi.dataInterchange.model.UPConnectRsp; //import com.ruoyi.dataInterchange.model.enu.DataType; //import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder; //import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder; //import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; //import io.netty.bootstrap.Bootstrap; //import io.netty.channel.*; //import io.netty.channel.nio.NioEventLoopGroup; //import io.netty.channel.socket.SocketChannel; //import io.netty.channel.socket.nio.NioSocketChannel; //import io.netty.handler.timeout.IdleStateHandler; //import org.junit.jupiter.api.Test; //import org.springframework.boot.test.context.SpringBootTest; // //import java.net.InetSocketAddress; //import java.util.concurrent.TimeUnit; // // //@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RuoYiDataInterchangeApplication.class) //public class RuoYiOrderApplicationTests { // // @Test // public void test() { // EventLoopGroup nioEventLoopGroup = null; // try { // //创建Bootstrap对象用来引导启动客户端 // Bootstrap bootstrap = new Bootstrap(); // //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据 // nioEventLoopGroup = new NioEventLoopGroup(); // //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址 // bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress("127.0.0.1", 1000)) // .handler(new ChannelInitializer<SocketChannel>() { // //添加一个ChannelHandler,客户端成功连接服务器后就会被执行 // @Override // protected void initChannel(SocketChannel socketChannel) // throws Exception { // ChannelPipeline pipeline = socketChannel.pipeline(); // // 解码器 // pipeline.addLast("decoder", new Jtt809Decoder()); // // 编码器 // pipeline.addLast("encoder", new Jtt809Encoder()); // // 空闲检测处理器 触发空闲状态事件 读空闲:5秒 写空闲:7秒 读写空闲:10秒 // pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS)); // // 处理器 //// pipeline.addLast("handler", new NettyHandle()); // } // }); // // • 调用Bootstrap.connect()来连接服务器 // ChannelFuture f = bootstrap.connect().sync(); // //将通道添加到缓存中,便于后期直接使用 // Channel channel = f.channel(); // // • 最后关闭EventLoopGroup来释放资源 // f.channel().closeFuture().sync(); // // // //解析封装原元数据 // UPConnectRsp upConnectRsp = new UPConnectRsp(); // upConnectRsp.setResult(0x00); // // 随机一个校验码 // upConnectRsp.setVerifyCode(1234); // //主链路登录应答 // byte[] body = upConnectRsp.encode(); // OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(), body); // channel.writeAndFlush(out); // channel.flush(); // // } catch (Exception e) { // e.printStackTrace(); // } finally { // try { // nioEventLoopGroup.shutdownGracefully().sync(); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // } // } //} ruoyi-service/ruoyi-dataInterchange/src/test/java/com/ruoyi/dataInterchange/order/RuoYiOrderApplicationTests.java
File was deleted