Pu Zhibing
2025-03-14 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java
@@ -1,16 +1,29 @@
package com.ruoyi.dataInterchange.server;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 从链路连接应答
 *
 * @author zhibing.pu
 * @Date 2025/3/6 16:11
 */
@@ -18,15 +31,68 @@
@Component
public class DOWNConnectRspService {
   
   @Resource
   private RedisTemplate redisTemplate;
   @Resource
   private ConnectReqService connectReqService;
   private Map<Integer, ScheduledExecutorService> scheduledMap = new HashMap<>();
   
   /**
    * 从链路连接应答
    *
    * @param ctx
    * @param outerPacket
    */
   public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket){
   public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) {
      ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
      DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf);
      //从链路保持请求
      downLinkTest(outerPacket.getGnsscenterId());
   }
   /**
    * 从链路连接保持请求
    *
    * @param inferiorPlatformId
    */
   public void downLinkTest(int inferiorPlatformId) {
      ScheduledExecutorService scheduledExecutorService = scheduledMap.get(inferiorPlatformId);
      if (null == scheduledExecutorService) {
         //创建定时任务间隔发送链接保持请求
         scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
               //获取从链路通道
               Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
               if (null != channel && channel.isActive()) {
                  OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
                  channel.writeAndFlush(out);
                  log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
                  redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
               } else {
                  //记录失败次数,然后再重新连接
                  int times = ChannelMap.getTimes(inferiorPlatformId);
                  if (times >= 18) {
                     UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                     boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                     if (b) {
                        times = 0;
                     } else {
                        times++;
                     }
                  } else {
                     times++;
                  }
                  ChannelMap.saveTimes(inferiorPlatformId, times);
               }
            }
         }, 10, 10, TimeUnit.SECONDS);
         scheduledMap.put(inferiorPlatformId, scheduledExecutorService);
      }
   }
}