From 4ad32c37e3317fb6cb7cbfb034b53d3220c27db4 Mon Sep 17 00:00:00 2001 From: puzhibing <393733352@qq.com> Date: 星期五, 30 五月 2025 17:59:16 +0800 Subject: [PATCH] 优化报警 --- ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java | 105 +++++++++++++++------------------------------------- 1 files changed, 30 insertions(+), 75 deletions(-) diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java index 3ff42ad..f6ab282 100644 --- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java +++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java @@ -1,7 +1,6 @@ package com.ruoyi.dataInterchange.server; import com.alibaba.fastjson.JSON; -import com.ruoyi.dataInterchange.model.DOWNConnectReq; import com.ruoyi.dataInterchange.model.DOWNDisconnectInform; import com.ruoyi.dataInterchange.model.UPConnectReq; import com.ruoyi.dataInterchange.model.UPConnectRsp; @@ -9,20 +8,20 @@ import com.ruoyi.dataInterchange.netty.client.ChannelMap; import com.ruoyi.dataInterchange.netty.client.NettyClient; import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; -import com.ruoyi.dataInterchange.wapper.UPConnect; import com.ruoyi.system.api.feignClient.EnterpriseClient; import com.ruoyi.system.api.model.Enterprise; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Random; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -36,6 +35,9 @@ @Resource private EnterpriseClient enterpriseClient; + @Resource + private RedisTemplate redisTemplate; + /** * 主链路登录请求业务逻辑 @@ -46,6 +48,7 @@ ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody()); //解析封装原元数据 UPConnectReq upConnectReq = new UPConnectReq().decode(byteBuf); + log.info("主链路登录请求体:" + JSON.toJSONString(upConnectReq)); Enterprise data = enterpriseClient.getEnterprise(upConnectReq.getUserId() + "").getData(); UPConnectRsp upConnectRsp = new UPConnectRsp(); if (null == data) { @@ -69,14 +72,15 @@ //主链路登录应答 log.info("主链路登录应答({}):{}", DataType.UP_CONNECT_RSP.getCode(), JSON.toJSONString(upConnectRsp)); byte[] body = upConnectRsp.encode(); - OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(), body); + OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(),outerPacket.getGnsscenterId() , body); ctx.writeAndFlush(out); ctx.flush(); if (upConnectRsp.getResult() == 0x00) { + redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES); //保存链路 - ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel().id()); - //从链路连接 TODO 临时注释掉 -// downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); + ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel()); + //从链路连接 + downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); } else { ctx.close(); } @@ -93,11 +97,8 @@ public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) { try { boolean b = downConnect(inferiorPlatformId, host, port, verifyCode); - if (b) { - downLinkTest(inferiorPlatformId); - } } catch (Exception e) { - downDisconnectInform(ctx, 0x00); + downDisconnectInform(ctx, inferiorPlatformId, 0x00); throw new RuntimeException(e); } } @@ -112,70 +113,24 @@ */ public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) { try { - new NettyClient(host, port).start(inferiorPlatformId); - //构建从链路请求 - DOWNConnectReq downConnectReq = new DOWNConnectReq(); - downConnectReq.setVerifyCode(verifyCode); - //获取从链路通道 - Channel channel = ChannelMap.getClientChannel(inferiorPlatformId); - if (null != channel && channel.isActive()) { - log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq)); - byte[] body = downConnectReq.encode(); - OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body); - channel.writeAndFlush(out); - channel.flush(); - - //缓存从链路地址 - UPConnect upConnect = new UPConnect(); - upConnect.setDownLinkIp(host); - upConnect.setDownLinkPort(port); - upConnect.setVerifyCode(verifyCode); - ChannelMap.addIpAndPort(inferiorPlatformId, upConnect); - return true; - } + ExecutorService executorService = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + new NettyClient(host, port).start(inferiorPlatformId, verifyCode); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + return true; } catch (Exception e) { e.printStackTrace(); } return false; - } - - - /** - * 从链路连接保持请求 - * - * @param inferiorPlatformId - */ - public void downLinkTest(int inferiorPlatformId) { - //创建定时任务间隔发送链接保持请求 - ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - //获取从链路通道 - Channel channel = ChannelMap.getClientChannel(inferiorPlatformId); - if (null != channel && channel.isActive()) { - log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), ""); - OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null); - channel.writeAndFlush(out); - channel.flush(); - } else { - //记录失败次数,然后再重新连接 - int times = ChannelMap.getTimes(inferiorPlatformId); - if (times >= 18) { - UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId); - boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode()); - if (b) { - times = 0; - } else { - times++; - } - } else { - times++; - } - ChannelMap.saveTimes(inferiorPlatformId, times); - } - } - }, 10, 10, TimeUnit.SECONDS); } @@ -185,12 +140,12 @@ * @param ctx * @param errorCode */ - public void downDisconnectInform(ChannelHandlerContext ctx, int errorCode) { + public void downDisconnectInform(ChannelHandlerContext ctx, int inferiorPlatformId, int errorCode) { DOWNDisconnectInform downDisconnectInform = new DOWNDisconnectInform(); downDisconnectInform.setErrorCode(errorCode); log.info("从链路断开通知({}):{}", DataType.DOWN_DISCONNECT_INFORM.getCode(), JSON.toJSONString(downDisconnectInform)); byte[] body = downDisconnectInform.encode(); - OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body); + OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body); ctx.writeAndFlush(out); ctx.flush(); } -- Gitblit v1.7.1