From 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期五, 14 三月 2025 18:32:30 +0800 Subject: [PATCH] 修改809对接bug --- ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java | 72 ++++++++++++++++++++++++++++++++++- 1 files changed, 69 insertions(+), 3 deletions(-) diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java index 26f52a7..3497ff4 100644 --- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java +++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java @@ -1,16 +1,29 @@ package com.ruoyi.dataInterchange.server; -import com.ruoyi.dataInterchange.model.DOWNConnectRsp;import com.ruoyi.dataInterchange.model.UPConnectReq; +import com.ruoyi.dataInterchange.model.DOWNConnectRsp; +import com.ruoyi.dataInterchange.model.enu.DataType; +import com.ruoyi.dataInterchange.netty.client.ChannelMap; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; +import com.ruoyi.dataInterchange.wapper.UPConnect; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * 从链路连接应答 + * * @author zhibing.pu * @Date 2025/3/6 16:11 */ @@ -18,15 +31,68 @@ @Component public class DOWNConnectRspService { + @Resource + private RedisTemplate redisTemplate; + + @Resource + private ConnectReqService connectReqService; + + private Map<Integer, ScheduledExecutorService> scheduledMap = new HashMap<>(); + /** * 从链路连接应答 + * * @param ctx * @param outerPacket */ - public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket){ + public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) { ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody()); DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf); - + //从链路保持请求 + downLinkTest(outerPacket.getGnsscenterId()); + } + + + /** + * 从链路连接保持请求 + * + * @param inferiorPlatformId + */ + public void downLinkTest(int inferiorPlatformId) { + ScheduledExecutorService scheduledExecutorService = scheduledMap.get(inferiorPlatformId); + if (null == scheduledExecutorService) { + //创建定时任务间隔发送链接保持请求 + scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + //获取从链路通道 + Channel channel = ChannelMap.getClientChannel(inferiorPlatformId); + if (null != channel && channel.isActive()) { + OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null); + channel.writeAndFlush(out); + log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), ""); + redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES); + } else { + //记录失败次数,然后再重新连接 + int times = ChannelMap.getTimes(inferiorPlatformId); + if (times >= 18) { + UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId); + boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode()); + if (b) { + times = 0; + } else { + times++; + } + } else { + times++; + } + ChannelMap.saveTimes(inferiorPlatformId, times); + } + } + }, 10, 10, TimeUnit.SECONDS); + scheduledMap.put(inferiorPlatformId, scheduledExecutorService); + } } } -- Gitblit v1.7.1