Pu Zhibing
20 小时以前 185b6c7f14d8324540f39913e2b92146636cc5e0
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/ConnectReqService.java
@@ -1,7 +1,6 @@
package com.ruoyi.dataInterchange.server;
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.UPConnectRsp;
@@ -9,20 +8,20 @@
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.netty.client.NettyClient;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -36,6 +35,9 @@
   @Resource
   private EnterpriseClient enterpriseClient;
   
   @Resource
   private RedisTemplate redisTemplate;
   
   /**
    * 主链路登录请求业务逻辑
@@ -46,6 +48,7 @@
      ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
      //解析封装原元数据
      UPConnectReq upConnectReq = new UPConnectReq().decode(byteBuf);
      log.info("主链路登录请求体:" + JSON.toJSONString(upConnectReq));
      Enterprise data = enterpriseClient.getEnterprise(upConnectReq.getUserId() + "").getData();
      UPConnectRsp upConnectRsp = new UPConnectRsp();
      if (null == data) {
@@ -69,14 +72,15 @@
      //主链路登录应答
      log.info("主链路登录应答({}):{}", DataType.UP_CONNECT_RSP.getCode(), JSON.toJSONString(upConnectRsp));
      byte[] body = upConnectRsp.encode();
      OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(), body);
      OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(),outerPacket.getGnsscenterId() , body);
      ctx.writeAndFlush(out);
      ctx.flush();
      if (upConnectRsp.getResult() == 0x00) {
         redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES);
         //保存链路
         ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel().id());
         //从链路连接 TODO 临时注释掉
//         downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
         ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
         //从链路连接
         downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
      } else {
         ctx.close();
      }
@@ -93,11 +97,8 @@
   public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
      try {
         boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
         if (b) {
            downLinkTest(inferiorPlatformId);
         }
      } catch (Exception e) {
         downDisconnectInform(ctx, 0x00);
         downDisconnectInform(ctx, inferiorPlatformId, 0x00);
         throw new RuntimeException(e);
      }
   }
@@ -112,70 +113,24 @@
    */
   public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
      try {
         new NettyClient(host, port).start(inferiorPlatformId);
         //构建从链路请求
         DOWNConnectReq downConnectReq = new DOWNConnectReq();
         downConnectReq.setVerifyCode(verifyCode);
         //获取从链路通道
         Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
         if (null != channel && channel.isActive()) {
            log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
            byte[] body = downConnectReq.encode();
            OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
            channel.writeAndFlush(out);
            channel.flush();
            //缓存从链路地址
            UPConnect upConnect = new UPConnect();
            upConnect.setDownLinkIp(host);
            upConnect.setDownLinkPort(port);
            upConnect.setVerifyCode(verifyCode);
            ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
            return true;
         }
         ExecutorService executorService = new ThreadPoolExecutor(1, 1,
               0L, TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<Runnable>());
         executorService.execute(new Runnable() {
            @Override
            public void run() {
               try {
                  new NettyClient(host, port).start(inferiorPlatformId, verifyCode);
               } catch (Exception e) {
                  throw new RuntimeException(e);
               }
            }
         });
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
   /**
    * 从链路连接保持请求
    *
    * @param inferiorPlatformId
    */
   public void downLinkTest(int inferiorPlatformId) {
      //创建定时任务间隔发送链接保持请求
      ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
      scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            //获取从链路通道
            Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
            if (null != channel && channel.isActive()) {
               log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
               OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
               channel.writeAndFlush(out);
               channel.flush();
            } else {
               //记录失败次数,然后再重新连接
               int times = ChannelMap.getTimes(inferiorPlatformId);
               if (times >= 18) {
                  UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                  boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                  if (b) {
                     times = 0;
                  } else {
                     times++;
                  }
               } else {
                  times++;
               }
               ChannelMap.saveTimes(inferiorPlatformId, times);
            }
         }
      }, 10, 10, TimeUnit.SECONDS);
   }
   
   
@@ -185,12 +140,12 @@
    * @param ctx
    * @param errorCode
    */
   public void downDisconnectInform(ChannelHandlerContext ctx, int errorCode) {
   public void downDisconnectInform(ChannelHandlerContext ctx, int inferiorPlatformId, int errorCode) {
      DOWNDisconnectInform downDisconnectInform = new DOWNDisconnectInform();
      downDisconnectInform.setErrorCode(errorCode);
      log.info("从链路断开通知({}):{}", DataType.DOWN_DISCONNECT_INFORM.getCode(), JSON.toJSONString(downDisconnectInform));
      byte[] body = downDisconnectInform.encode();
      OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
      OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body);
      ctx.writeAndFlush(out);
      ctx.flush();
   }