package com.ruoyi.dataInterchange.server;
|
|
import com.ruoyi.dataInterchange.model.DOWNConnectRsp;
|
import com.ruoyi.dataInterchange.model.enu.DataType;
|
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
|
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
|
import com.ruoyi.dataInterchange.wapper.UPConnect;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelHandlerContext;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.TimeUnit;
|
|
|
/**
|
* 从链路连接应答
|
*
|
* @author zhibing.pu
|
* @Date 2025/3/6 16:11
|
*/
|
@Slf4j
|
@Component
|
public class DOWNConnectRspService {
|
|
@Resource
|
private RedisTemplate redisTemplate;
|
|
@Resource
|
private ConnectReqService connectReqService;
|
|
private Map<Integer, ScheduledExecutorService> scheduledMap = new HashMap<>();
|
|
|
/**
|
* 从链路连接应答
|
*
|
* @param ctx
|
* @param outerPacket
|
*/
|
public void connectRsp(ChannelHandlerContext ctx, OuterPacket outerPacket) {
|
ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
|
DOWNConnectRsp downConnectRsp = new DOWNConnectRsp().decode(byteBuf);
|
//从链路保持请求
|
downLinkTest(outerPacket.getGnsscenterId());
|
}
|
|
|
/**
|
* 从链路连接保持请求
|
*
|
* @param inferiorPlatformId
|
*/
|
public void downLinkTest(int inferiorPlatformId) {
|
ScheduledExecutorService scheduledExecutorService = scheduledMap.get(inferiorPlatformId);
|
if (null == scheduledExecutorService) {
|
//创建定时任务间隔发送链接保持请求
|
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
|
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
|
@Override
|
public void run() {
|
//获取从链路通道
|
Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
|
if (null != channel && channel.isActive()) {
|
OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), inferiorPlatformId, null);
|
channel.writeAndFlush(out);
|
log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
|
redisTemplate.opsForValue().set("login:" + inferiorPlatformId, System.currentTimeMillis(), 1, TimeUnit.MINUTES);
|
} else {
|
//记录失败次数,然后再重新连接
|
int times = ChannelMap.getTimes(inferiorPlatformId);
|
if (times >= 18) {
|
UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
|
boolean b = connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
|
if (b) {
|
times = 0;
|
} else {
|
times++;
|
}
|
} else {
|
times++;
|
}
|
ChannelMap.saveTimes(inferiorPlatformId, times);
|
}
|
}
|
}, 10, 10, TimeUnit.SECONDS);
|
scheduledMap.put(inferiorPlatformId, scheduledExecutorService);
|
}
|
}
|
}
|