Pu Zhibing
2025-03-14 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -1,7 +1,6 @@
package com.ruoyi.dataInterchange.server;
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.UPConnectRsp;
@@ -9,20 +8,20 @@
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.netty.client.NettyClient;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -35,6 +34,9 @@
   
   @Resource
   private EnterpriseClient enterpriseClient;
   @Resource
   private RedisTemplate redisTemplate;
   
   
   /**
@@ -74,10 +76,11 @@
      ctx.writeAndFlush(out);
      ctx.flush();
      if (upConnectRsp.getResult() == 0x00) {
         redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES);
         //保存链路
         ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
         //从链路连接
//         downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
         downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
      } else {
         ctx.close();
      }
@@ -94,9 +97,7 @@
   public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
      try {
         boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
         if (b) {
            downLinkTest(inferiorPlatformId);
         }
      } catch (Exception e) {
         downDisconnectInform(ctx, 0x00);
         throw new RuntimeException(e);
@@ -113,70 +114,24 @@
    */
   public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
      try {
         new NettyClient(host, port).start(inferiorPlatformId);
         //构建从链路请求
         DOWNConnectReq downConnectReq = new DOWNConnectReq();
         downConnectReq.setVerifyCode(verifyCode);
         //获取从链路通道
         Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
         if (null != channel && channel.isActive()) {
            log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
            byte[] body = downConnectReq.encode();
            OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
            channel.writeAndFlush(out);
            channel.flush();
            //缓存从链路地址
            UPConnect upConnect = new UPConnect();
            upConnect.setDownLinkIp(host);
            upConnect.setDownLinkPort(port);
            upConnect.setVerifyCode(verifyCode);
            ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
            return true;
         }
         ExecutorService executorService = new ThreadPoolExecutor(1, 1,
               0L, TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<Runnable>());
         executorService.execute(new Runnable() {
            @Override
            public void run() {
               try {
                  new NettyClient(host, port).start(inferiorPlatformId, verifyCode);
               } catch (Exception e) {
                  throw new RuntimeException(e);
               }
            }
         });
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
   /**
    * 从链路连接保持请求
    *
    * @param inferiorPlatformId
    */
   public void downLinkTest(int inferiorPlatformId) {
      //创建定时任务间隔发送链接保持请求
      ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
      scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            //获取从链路通道
            Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
            if (null != channel && channel.isActive()) {
               log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
               OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
               channel.writeAndFlush(out);
               channel.flush();
            } else {
               //记录失败次数,然后再重新连接
               int times = ChannelMap.getTimes(inferiorPlatformId);
               if (times >= 18) {
                  UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                  boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                  if (b) {
                     times = 0;
                  } else {
                     times++;
                  }
               } else {
                  times++;
               }
               ChannelMap.saveTimes(inferiorPlatformId, times);
            }
         }
      }, 10, 10, TimeUnit.SECONDS);
   }