Pu Zhibing
8 小时以前 b554a106bc1b06e0320b642d16c31554ae558301
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java
@@ -5,6 +5,8 @@
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@@ -15,6 +17,7 @@
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -36,8 +39,9 @@
   
   @Resource
   private ConnectReqService connectReqService;
   private Map<Integer, ScheduledExecutorService> scheduledMap = new HashMap<>();
   @Resource
   private EnterpriseClient enterpriseClient;
   
   
   /**
@@ -49,50 +53,41 @@
   public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) {
      ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
      DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf);
      //从链路保持请求
      downLinkTest(outerPacket.getGnsscenterId());
   }
   
   
   /**
    * 从链路连接保持请求
    *
    * @param inferiorPlatformId
    */
   public void downLinkTest(int inferiorPlatformId) {
      ScheduledExecutorService scheduledExecutorService = scheduledMap.get(inferiorPlatformId);
      if (null == scheduledExecutorService) {
         //创建定时任务间隔发送链接保持请求
         scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
               //获取从链路通道
               Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
               if (null != channel && channel.isActive()) {
                  OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), inferiorPlatformId, null);
                  channel.writeAndFlush(out);
                  log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
                  redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
   public void downLinkTest(Integer inferiorPlatformId) {
      //获取从链路通道
      Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
      if (null != channel && channel.isActive()) {
         OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), inferiorPlatformId, null);
         channel.writeAndFlush(out);
         log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
         redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
      } else {
         //记录失败次数,然后再重新连接
         Integer times = ChannelMap.getTimes(inferiorPlatformId);
         if(null == times){
            times = 0;
         }
         if (times >= 18) {
            UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
            if(null != ipAndPort){
               boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
               if (b) {
                  times = 0;
               } else {
                  //记录失败次数,然后再重新连接
                  int times = ChannelMap.getTimes(inferiorPlatformId);
                  if (times >= 18) {
                     UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                     boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                     if (b) {
                        times = 0;
                     } else {
                        times++;
                     }
                  } else {
                     times++;
                  }
                  ChannelMap.saveTimes(inferiorPlatformId, times);
                  times++;
               }
            }
         }, 10, 10, TimeUnit.SECONDS);
         scheduledMap.put(inferiorPlatformId, scheduledExecutorService);
         } else {
            times++;
         }
         ChannelMap.saveTimes(inferiorPlatformId, times);
      }
   }
}