| | |
| | | package com.ruoyi.dataInterchange.server; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.ruoyi.dataInterchange.model.DOWNConnectReq; |
| | | import com.ruoyi.dataInterchange.model.DOWNDisconnectInform; |
| | | import com.ruoyi.dataInterchange.model.UPConnectReq; |
| | | import com.ruoyi.dataInterchange.model.UPConnectRsp; |
| | |
| | | import com.ruoyi.dataInterchange.netty.client.ChannelMap; |
| | | import com.ruoyi.dataInterchange.netty.client.NettyClient; |
| | | import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket; |
| | | import com.ruoyi.dataInterchange.wapper.UPConnect; |
| | | import com.ruoyi.system.api.feignClient.EnterpriseClient; |
| | | import com.ruoyi.system.api.model.Enterprise; |
| | | import io.netty.buffer.ByteBuf; |
| | | import io.netty.buffer.Unpooled; |
| | | import io.netty.channel.Channel; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.Random; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledThreadPoolExecutor; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | |
| | | |
| | | @Resource |
| | | private EnterpriseClient enterpriseClient; |
| | | |
| | | @Resource |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | |
| | | /** |
| | |
| | | ctx.writeAndFlush(out); |
| | | ctx.flush(); |
| | | if (upConnectRsp.getResult() == 0x00) { |
| | | redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES); |
| | | //保存链路 |
| | | ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel()); |
| | | //从链路连接 |
| | | // downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); |
| | | downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode); |
| | | } else { |
| | | ctx.close(); |
| | | } |
| | |
| | | public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) { |
| | | try { |
| | | boolean b = downConnect(inferiorPlatformId, host, port, verifyCode); |
| | | if (b) { |
| | | downLinkTest(inferiorPlatformId); |
| | | } |
| | | |
| | | } catch (Exception e) { |
| | | downDisconnectInform(ctx, 0x00); |
| | | throw new RuntimeException(e); |
| | |
| | | */ |
| | | public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) { |
| | | try { |
| | | new NettyClient(host, port).start(inferiorPlatformId); |
| | | //构建从链路请求 |
| | | DOWNConnectReq downConnectReq = new DOWNConnectReq(); |
| | | downConnectReq.setVerifyCode(verifyCode); |
| | | //获取从链路通道 |
| | | Channel channel = ChannelMap.getClientChannel(inferiorPlatformId); |
| | | if (null != channel && channel.isActive()) { |
| | | log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq)); |
| | | byte[] body = downConnectReq.encode(); |
| | | OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body); |
| | | channel.writeAndFlush(out); |
| | | channel.flush(); |
| | | |
| | | //缓存从链路地址 |
| | | UPConnect upConnect = new UPConnect(); |
| | | upConnect.setDownLinkIp(host); |
| | | upConnect.setDownLinkPort(port); |
| | | upConnect.setVerifyCode(verifyCode); |
| | | ChannelMap.addIpAndPort(inferiorPlatformId, upConnect); |
| | | return true; |
| | | } |
| | | ExecutorService executorService = new ThreadPoolExecutor(1, 1, |
| | | 0L, TimeUnit.MILLISECONDS, |
| | | new LinkedBlockingQueue<Runnable>()); |
| | | executorService.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | new NettyClient(host, port).start(inferiorPlatformId, verifyCode); |
| | | } catch (Exception e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | }); |
| | | return true; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 从链路连接保持请求 |
| | | * |
| | | * @param inferiorPlatformId |
| | | */ |
| | | public void downLinkTest(int inferiorPlatformId) { |
| | | //创建定时任务间隔发送链接保持请求 |
| | | ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); |
| | | scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | //获取从链路通道 |
| | | Channel channel = ChannelMap.getClientChannel(inferiorPlatformId); |
| | | if (null != channel && channel.isActive()) { |
| | | log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), ""); |
| | | OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null); |
| | | channel.writeAndFlush(out); |
| | | channel.flush(); |
| | | } else { |
| | | //记录失败次数,然后再重新连接 |
| | | int times = ChannelMap.getTimes(inferiorPlatformId); |
| | | if (times >= 18) { |
| | | UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId); |
| | | boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode()); |
| | | if (b) { |
| | | times = 0; |
| | | } else { |
| | | times++; |
| | | } |
| | | } else { |
| | | times++; |
| | | } |
| | | ChannelMap.saveTimes(inferiorPlatformId, times); |
| | | } |
| | | } |
| | | }, 10, 10, TimeUnit.SECONDS); |
| | | } |
| | | |
| | | |