From 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7 Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期五, 14 三月 2025 18:32:30 +0800
Subject: [PATCH] 修改809对接bug

---
 ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java |   93 ++++++++++++----------------------------------
 1 files changed, 24 insertions(+), 69 deletions(-)

diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
index 935fca0..2ad7944 100644
--- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
+++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -1,7 +1,6 @@
 package com.ruoyi.dataInterchange.server;
 
 import com.alibaba.fastjson.JSON;
-import com.ruoyi.dataInterchange.model.DOWNConnectReq;
 import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
 import com.ruoyi.dataInterchange.model.UPConnectReq;
 import com.ruoyi.dataInterchange.model.UPConnectRsp;
@@ -9,20 +8,20 @@
 import com.ruoyi.dataInterchange.netty.client.ChannelMap;
 import com.ruoyi.dataInterchange.netty.client.NettyClient;
 import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
-import com.ruoyi.dataInterchange.wapper.UPConnect;
 import com.ruoyi.system.api.feignClient.EnterpriseClient;
 import com.ruoyi.system.api.model.Enterprise;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -35,6 +34,9 @@
 	
 	@Resource
 	private EnterpriseClient enterpriseClient;
+	
+	@Resource
+	private RedisTemplate redisTemplate;
 	
 	
 	/**
@@ -74,10 +76,11 @@
 		ctx.writeAndFlush(out);
 		ctx.flush();
 		if (upConnectRsp.getResult() == 0x00) {
+			redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES);
 			//保存链路
 			ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
 			//从链路连接
-//			downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
+			downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
 		} else {
 			ctx.close();
 		}
@@ -94,9 +97,7 @@
 	public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
 		try {
 			boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
-			if (b) {
-				downLinkTest(inferiorPlatformId);
-			}
+			
 		} catch (Exception e) {
 			downDisconnectInform(ctx, 0x00);
 			throw new RuntimeException(e);
@@ -113,70 +114,24 @@
 	 */
 	public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
 		try {
-			new NettyClient(host, port).start(inferiorPlatformId);
-			//构建从链路请求
-			DOWNConnectReq downConnectReq = new DOWNConnectReq();
-			downConnectReq.setVerifyCode(verifyCode);
-			//获取从链路通道
-			Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
-			if (null != channel && channel.isActive()) {
-				log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
-				byte[] body = downConnectReq.encode();
-				OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
-				channel.writeAndFlush(out);
-				channel.flush();
-				
-				//缓存从链路地址
-				UPConnect upConnect = new UPConnect();
-				upConnect.setDownLinkIp(host);
-				upConnect.setDownLinkPort(port);
-				upConnect.setVerifyCode(verifyCode);
-				ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
-				return true;
-			}
+			ExecutorService executorService = new ThreadPoolExecutor(1, 1,
+					0L, TimeUnit.MILLISECONDS,
+					new LinkedBlockingQueue<Runnable>());
+			executorService.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						new NettyClient(host, port).start(inferiorPlatformId, verifyCode);
+					} catch (Exception e) {
+						throw new RuntimeException(e);
+					}
+				}
+			});
+			return true;
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
 		return false;
-	}
-	
-	
-	/**
-	 * 从链路连接保持请求
-	 *
-	 * @param inferiorPlatformId
-	 */
-	public void downLinkTest(int inferiorPlatformId) {
-		//创建定时任务间隔发送链接保持请求
-		ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-		scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-			@Override
-			public void run() {
-				//获取从链路通道
-				Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
-				if (null != channel && channel.isActive()) {
-					log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
-					OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
-					channel.writeAndFlush(out);
-					channel.flush();
-				} else {
-					//记录失败次数,然后再重新连接
-					int times = ChannelMap.getTimes(inferiorPlatformId);
-					if (times >= 18) {
-						UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
-						boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
-						if (b) {
-							times = 0;
-						} else {
-							times++;
-						}
-					} else {
-						times++;
-					}
-					ChannelMap.saveTimes(inferiorPlatformId, times);
-				}
-			}
-		}, 10, 10, TimeUnit.SECONDS);
 	}
 	
 	

--
Gitblit v1.7.1