Pu Zhibing
8 小时以前 b554a106bc1b06e0320b642d16c31554ae558301
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/DOWNConnectRspService.java
@@ -1,16 +1,32 @@
package com.ruoyi.dataInterchange.server;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 从链路连接应答
 *
 * @author zhibing.pu
 * @Date 2025/3/6 16:11
 */
@@ -18,15 +34,60 @@
@Component
public class DOWNConnectRspService {
   
   @Resource
   private RedisTemplate redisTemplate;
   @Resource
   private ConnectReqService connectReqService;
   @Resource
   private EnterpriseClient enterpriseClient;
   
   /**
    * 从链路连接应答
    *
    * @param ctx
    * @param outerPacket
    */
   public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket){
   public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) {
      ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
      DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf);
   }
   /**
    * 从链路连接保持请求
    *
    */
   public void downLinkTest(Integer inferiorPlatformId) {
      //获取从链路通道
      Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
      if (null != channel && channel.isActive()) {
         OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), inferiorPlatformId, null);
         channel.writeAndFlush(out);
         log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
         redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
      } else {
         //记录失败次数,然后再重新连接
         Integer times = ChannelMap.getTimes(inferiorPlatformId);
         if(null == times){
            times = 0;
         }
         if (times >= 18) {
            UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
            if(null != ipAndPort){
               boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
               if (b) {
                  times = 0;
               } else {
                  times++;
               }
            }
         } else {
            times++;
         }
         ChannelMap.saveTimes(inferiorPlatformId, times);
      }
   }
}