ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/RuoYiDataInterchangeApplication.java
@@ -6,12 +6,12 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; /** * 账户模块 * * @author ruoyi */ @EnableCustomConfig @@ -22,6 +22,8 @@ @SpringBootApplication @EnableTransactionManagement//开启事务 public class RuoYiDataInterchangeApplication { public static void main(String[] args) { SpringApplication.run(RuoYiDataInterchangeApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 数据交互模块启动成功 ლ(´ڡ`ლ)゙ "); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/dao/UPWarnMsgAdptInfoDao.java
@@ -4,10 +4,21 @@ import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; import java.util.List; /** * @author zhibing.pu * @Date 2025/3/7 19:30 */ @Repository public interface UPWarnMsgAdptInfoDao extends ElasticsearchRepository<UPWarnMsgAdptInfo, Long> { /** * 根据处理状态查询数据 * * @param result * @return */ List<UPWarnMsgAdptInfo> findByResultIs(int result); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNConnectReq.java
@@ -19,7 +19,7 @@ private int verifyCode; public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(1); ByteBuf byteBuf = Unpooled.buffer(4); byteBuf.writeInt(this.getVerifyCode()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNDisconnectInform.java
@@ -1,6 +1,5 @@ package com.ruoyi.dataInterchange.model; import com.fasterxml.jackson.annotation.JsonProperty; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -8,6 +7,7 @@ /** * 从链路断开通知 * * @author zhibing.pu * @Date 2025/2/24 11:12 */ @@ -24,7 +24,7 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(1); byteBuf.writeInt(this.getErrorCode()); byteBuf.writeByte(this.getErrorCode()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); return bytes; ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNExgMsgReportDriverInfo.java
@@ -37,11 +37,15 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(28); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeByte(this.getDataType()); byteBuf.writeShort(this.getDataType()); byteBuf.writeInt(this.getDataLength()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNExgMsgTakeEwaybillReq.java
@@ -7,6 +7,7 @@ /** * 上报车辆电子运单请求 * * @author zhibing.pu * @Date 2025/3/3 11:06 */ @@ -36,11 +37,15 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(28); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeByte(this.getDataType()); byteBuf.writeShort(this.getDataType()); byteBuf.writeInt(this.getDataLength()); byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/DOWNWarnMsgUrgeTodoReq.java
@@ -100,8 +100,12 @@ public byte[] encode() { ByteBuf byteBuf = Unpooled.buffer(120); byte[] bytes1 = this.getVehicleNo().getBytes(); for (byte b : bytes1) { byteBuf.writeByte(b); for (int i = 0; i < 21; i++) { if (i < bytes1.length) { byteBuf.writeByte(bytes1[i]); } else { byteBuf.writeByte(0x00); } } byteBuf.writeByte(this.getVehicleColor()); byteBuf.writeShort(this.getDataType()); @@ -113,16 +117,28 @@ byteBuf.writeLong(this.getSupervisionEndTime()); byteBuf.writeByte(this.getSupervisionLevel()); byte[] bytes2 = this.getSupervisor().getBytes(); for (byte b : bytes2) { byteBuf.writeByte(b); for (int i = 0; i < 16; i++) { if (i < bytes2.length) { byteBuf.writeByte(bytes2[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes3 = this.getSupervisorTel().getBytes(); for (byte b : bytes3) { byteBuf.writeByte(b); for (int i = 0; i < 20; i++) { if (i < bytes3.length) { byteBuf.writeByte(bytes3[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes4 = this.getSupervisorEmail().getBytes(); for (byte b : bytes4) { byteBuf.writeByte(b); for (int i = 0; i < 32; i++) { if (i < bytes4.length) { byteBuf.writeByte(bytes4[i]); } else { byteBuf.writeByte(0x00); } } byte[] bytes = ByteBufUtil.getBytes(byteBuf); byteBuf.release(); ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/model/UPWarnMsgAdptInfo.java
@@ -75,6 +75,15 @@ */ @Field(type = FieldType.Text) private String infoContent; /** * 报警处理结果 * 0x00:处理中 * 0x01:已处理完毕 * 0x02:不作处理 * 0x03:将来处理 */ @Field(type = FieldType.Integer) private int result; /** ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/netty/client/NettyClient.java
@@ -1,9 +1,14 @@ package com.ruoyi.dataInterchange.netty.client; import com.ruoyi.dataInterchange.netty.server.NettyHandle; import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder; import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder; import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -11,8 +16,6 @@ import io.netty.handler.timeout.IdleStateHandler; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -30,8 +33,6 @@ private int port; public NettyClient(String host, int port) { this.host = host; this.port = port; @@ -40,6 +41,7 @@ /** * 执行启动连接 * * @throws Exception */ public void start(int code) throws Exception { @@ -64,7 +66,7 @@ // 空闲检测处理器 触发空闲状态事件 读空闲:5秒 写空闲:7秒 读写空闲:10秒 pipeline.addLast(new IdleStateHandler(5,7,3, TimeUnit.SECONDS)); // 处理器 pipeline.addLast("handler", new NettyHandle()); // pipeline.addLast("handler", new NettyHandle()); } }); // • 调用Bootstrap.connect()来连接服务器 @@ -80,10 +82,73 @@ } public static void main(String[] args) { try { new NettyClient("221.182.45.100", 1000).start(1); } catch (Exception e) { throw new RuntimeException(e); byte[] bytess = ByteArrayUtil.hexStr2Bytes("5B0000001F0000002B1002010A66F80100000000000000000000200C8E075D"); ByteBuf msg = Unpooled.wrappedBuffer(bytess); byte[] readableBytes = new byte[msg.readableBytes()]; msg.readBytes(readableBytes); // 反转义处理 byte[] bytes = Jtt809Util.unescape(readableBytes); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); // 判断包头 if (byteBuf.readByte() != Jtt809Constant.PACKET_HEAD_FLAG) { byteBuf.resetReaderIndex(); return; } // crc校验 // if (!Jtt809Util.validate(byteBuf)) { // return; // } /* 解析外层包 */ // 长度 long length = byteBuf.readUnsignedInt(); // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 // if (length != bytes.length) { // log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); // return; // } // 报文序列号 long sn = byteBuf.readUnsignedInt(); // 业务数据类型 int id = byteBuf.readUnsignedShort(); // 下级平台接入码 int gnsscenterId = byteBuf.readInt(); // 协议版本号标识 String version = "v" + byteBuf.readByte() + "." + byteBuf.readByte() + "." + byteBuf.readByte(); // 报文加密标识位 byte encryptFlag = byteBuf.readByte(); // 数据加密解密的密匙 long encryptKey = byteBuf.readUnsignedInt(); // 2019版 // String date = byteBuf.readByte() + "-" + byteBuf.readByte() + "-" + byteBuf.readShort() + " " + // byteBuf.readByte() + ":" + byteBuf.readByte() + ":" + byteBuf.readByte(); // long time = 0; // try { // time = DateUtils.parseDate(date, "dd-MM-yyyy HH:mm:ss").getTime(); // } catch (ParseException e) { // log.warn("日期 [{}] 解析错误", date); // } // 消息体 byte[] body; if (encryptFlag == 1) { byte[] encryptedBytes = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(encryptedBytes); // 解密 int[] param = GnssCenterService.getInstance().getDecryptParam(gnsscenterId); Jtt809Util.decrypt(param[0], param[1], param[2], encryptKey, encryptedBytes); body = encryptedBytes; } else { body = new byte[byteBuf.readableBytes() - 2]; byteBuf.readBytes(body); } ByteBuf buf = Unpooled.wrappedBuffer(body); int result = buf.readByte(); int verifyCode = buf.readInt(); // 校验码 int crcCode = byteBuf.readUnsignedShort(); //{"result":0,"verifyCode":658} } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -75,8 +75,8 @@ if (upConnectRsp.getResult() == 0x00) { //保存链路 ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel().id()); //从链路连接 downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); //从链路连接 TODO 临时注释掉 // downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); } else { ctx.close(); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
@@ -21,7 +21,6 @@ import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; /** @@ -125,6 +124,7 @@ */ public void up_warn_msg_adpt_info(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) { UPWarnMsgAdptInfo upWarnMsgAdptInfo = new UPWarnMsgAdptInfo().decode(warnMsg); upWarnMsgAdptInfo.setResult(0x00); upWarnMsgAdptInfo.setInferiorPlatformId(inferiorPlatformId); upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now()); upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo); @@ -135,8 +135,7 @@ * 定时任务督办报警请求 */ public void taskUrgeTodo() { upWarnMsgAdptInfoDao.findAll() List<UPWarnMsgAdptInfo> list = new ArrayList<>(); List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIs(0x00); for (UPWarnMsgAdptInfo upWarnMsgAdptInfo : list) { down_warn_msg_urge_todo_req(upWarnMsgAdptInfo); } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/decoder/Jtt809Decoder.java
@@ -4,16 +4,15 @@ import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; import com.ruoyi.dataInterchange.util.jtt809.gnsscenter.GnssCenterService; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;import io.netty.buffer.ByteBuf; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; import java.text.ParseException; import java.util.List; /** @@ -51,10 +50,10 @@ long length = byteBuf.readUnsignedInt(); // 长度校验, 反转义之后数组加上包头和包尾长度与解析出来的长度对比; // 因为数据长度不包含校验码,而此时解析出来的数据不包含头尾标识,刚好都是2个字节,所以两个长度应该相等 if (length != bytes.length) { log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); return; } // if (length != bytes.length) { // log.warn("消息长度校验错误,报文解析出来长度为 {}, 实际可解析的长度为 {}", length, bytes.length); // return; // } // 报文序列号 long sn = byteBuf.readUnsignedInt(); // 业务数据类型 @@ -97,4 +96,5 @@ out.add(new OuterPacket(length, sn, id, gnsscenterId, version, encryptFlag, encryptKey, body, crcCode)); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/encoder/Jtt809Encoder.java
@@ -1,5 +1,6 @@ package com.ruoyi.dataInterchange.util.jtt809.encoder; import com.ruoyi.dataInterchange.util.jtt809.common.ByteArrayUtil; import com.ruoyi.dataInterchange.util.jtt809.common.CRC16CCITT; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Constant; import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util; @@ -8,14 +9,12 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.ZoneOffset; import lombok.extern.slf4j.Slf4j; /** * @author tucke */ @Slf4j public class Jtt809Encoder extends MessageToByteEncoder<OuterPacket> { @@ -34,8 +33,8 @@ if (body == null) { body = new byte[0]; } // 32 = 头标识[1] + 数据头[30 = 长度[4] + 序列号[4] + 数据类型[2] + 接入码[4] + 版本号[3] + 加密标识[1] + 密钥[4]] + 时间[8] + 尾标识[1] int len = body.length + 32; // 26 = 头标识[1] + 数据头[22 = 长度[4] + 序列号[4] + 数据类型[2] + 接入码[4] + 版本号[3] + 加密标识[1] + 密钥[4]] + 校验码[2] + 尾标识[1] int len = body.length + 26; out.markReaderIndex(); // 数据长度 out.writeInt(len); @@ -73,6 +72,9 @@ out.writeBytes(Jtt809Util.escape(escapeBytes)); // 包尾标识 out.writeByte(Jtt809Constant.PACKET_END_FLAG); byte[] readableBytes = new byte[out.readableBytes()]; out.readBytes(readableBytes); log.info("下发数据包, packetLen : {}, packet : {}", readableBytes.length, ByteArrayUtil.bytes2HexStr(readableBytes)); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/jtt809/packet/common/OuterPacket.java
@@ -9,7 +9,6 @@ /** * @author tucke */ @SuppressWarnings("SpellCheckingInspection") @AllArgsConstructor @NoArgsConstructor @Data ruoyi-service/ruoyi-dataInterchange/src/test/java/com/ruoyi/dataInterchange/RuoYiOrderApplicationTests.java
New file @@ -0,0 +1,79 @@ //package com.ruoyi.dataInterchange; // //import com.ruoyi.dataInterchange.model.UPConnectRsp; //import com.ruoyi.dataInterchange.model.enu.DataType; //import com.ruoyi.dataInterchange.util.jtt809.decoder.Jtt809Decoder; //import com.ruoyi.dataInterchange.util.jtt809.encoder.Jtt809Encoder; //import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; //import io.netty.bootstrap.Bootstrap; //import io.netty.channel.*; //import io.netty.channel.nio.NioEventLoopGroup; //import io.netty.channel.socket.SocketChannel; //import io.netty.channel.socket.nio.NioSocketChannel; //import io.netty.handler.timeout.IdleStateHandler; //import org.junit.jupiter.api.Test; //import org.springframework.boot.test.context.SpringBootTest; // //import java.net.InetSocketAddress; //import java.util.concurrent.TimeUnit; // // //@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = RuoYiDataInterchangeApplication.class) //public class RuoYiOrderApplicationTests { // // @Test // public void test() { // EventLoopGroup nioEventLoopGroup = null; // try { // //创建Bootstrap对象用来引导启动客户端 // Bootstrap bootstrap = new Bootstrap(); // //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据 // nioEventLoopGroup = new NioEventLoopGroup(); // //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址 // bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress("127.0.0.1", 1000)) // .handler(new ChannelInitializer<SocketChannel>() { // //添加一个ChannelHandler,客户端成功连接服务器后就会被执行 // @Override // protected void initChannel(SocketChannel socketChannel) // throws Exception { // ChannelPipeline pipeline = socketChannel.pipeline(); // // 解码器 // pipeline.addLast("decoder", new Jtt809Decoder()); // // 编码器 // pipeline.addLast("encoder", new Jtt809Encoder()); // // 空闲检测处理器 触发空闲状态事件 读空闲:5秒 写空闲:7秒 读写空闲:10秒 // pipeline.addLast(new IdleStateHandler(5, 7, 3, TimeUnit.SECONDS)); // // 处理器 //// pipeline.addLast("handler", new NettyHandle()); // } // }); // // • 调用Bootstrap.connect()来连接服务器 // ChannelFuture f = bootstrap.connect().sync(); // //将通道添加到缓存中,便于后期直接使用 // Channel channel = f.channel(); // // • 最后关闭EventLoopGroup来释放资源 // f.channel().closeFuture().sync(); // // // //解析封装原元数据 // UPConnectRsp upConnectRsp = new UPConnectRsp(); // upConnectRsp.setResult(0x00); // // 随机一个校验码 // upConnectRsp.setVerifyCode(1234); // //主链路登录应答 // byte[] body = upConnectRsp.encode(); // OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(), body); // channel.writeAndFlush(out); // channel.flush(); // // } catch (Exception e) { // e.printStackTrace(); // } finally { // try { // nioEventLoopGroup.shutdownGracefully().sync(); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // } // } //} ruoyi-service/ruoyi-dataInterchange/src/test/java/com/ruoyi/dataInterchange/order/RuoYiOrderApplicationTests.java
File was deleted