Pu Zhibing
2025-03-11 19c823c2c8df4cceee0d827fb598e7f0d9b69154
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.ruoyi.dataInterchange.server;
 
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNConnectReq;
import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.UPConnectRsp;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.netty.client.NettyClient;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * @author zhibing.pu
 * @Date 2025/3/4 18:40
 */
@Slf4j
@Component
public class ConnectReqService {
    
    @Resource
    private EnterpriseClient enterpriseClient;
    
    
    /**
     * 主链路登录请求业务逻辑
     *
     * @param ctx
     */
    public void connect(ChannelHandlerContext ctx, OuterPacket outerPacket) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
        //解析封装原元数据
        UPConnectReq upConnectReq = new UPConnectReq().decode(byteBuf);
        log.info("主链路登录请求体:" + JSON.toJSONString(upConnectReq));
        Enterprise data = enterpriseClient.getEnterprise(upConnectReq.getUserId() + "").getData();
        UPConnectRsp upConnectRsp = new UPConnectRsp();
        if (null == data) {
            //用户没有注册
            upConnectRsp.setResult(0x03);
        } else {
            //IP地址不对
            if (!upConnectReq.getDownLinkIp().equals(data.getIpWhitelist())) {
                upConnectRsp.setResult(0x01);
            } else if (!data.getPassword().equals(upConnectReq.getPassword())) {
                upConnectRsp.setResult(0x04);
            } else if (outerPacket.getGnsscenterId() != Integer.valueOf(data.getCode())) {
                upConnectRsp.setResult(0x02);
            } else {
                upConnectRsp.setResult(0x00);
            }
        }
        // 随机一个校验码
        int verifyCode = new Random().nextInt(9999);
        upConnectRsp.setVerifyCode(verifyCode);
        //主链路登录应答
        log.info("主链路登录应答({}):{}", DataType.UP_CONNECT_RSP.getCode(), JSON.toJSONString(upConnectRsp));
        byte[] body = upConnectRsp.encode();
        OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(), body);
        ctx.writeAndFlush(out);
        ctx.flush();
        if (upConnectRsp.getResult() == 0x00) {
            //保存链路
            ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
            //从链路连接
//            downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
        } else {
            ctx.close();
        }
    }
    
    
    /**
     * 从链路连接
     *
     * @param inferiorPlatformId 企业唯一标识
     * @param host               连接IP
     * @param port               连接端口
     */
    public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
            if (b) {
                downLinkTest(inferiorPlatformId);
            }
        } catch (Exception e) {
            downDisconnectInform(ctx, 0x00);
            throw new RuntimeException(e);
        }
    }
    
    
    /**
     * 从链路连接
     *
     * @param inferiorPlatformId 企业唯一标识
     * @param host               连接IP
     * @param port               连接端口
     */
    public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            new NettyClient(host, port).start(inferiorPlatformId);
            //构建从链路请求
            DOWNConnectReq downConnectReq = new DOWNConnectReq();
            downConnectReq.setVerifyCode(verifyCode);
            //获取从链路通道
            Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
            if (null != channel && channel.isActive()) {
                log.info("从链路连接请求({}):{}", DataType.DOWN_CONNECT_REQ.getCode(), JSON.toJSONString(downConnectReq));
                byte[] body = downConnectReq.encode();
                OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
                channel.writeAndFlush(out);
                channel.flush();
                
                //缓存从链路地址
                UPConnect upConnect = new UPConnect();
                upConnect.setDownLinkIp(host);
                upConnect.setDownLinkPort(port);
                upConnect.setVerifyCode(verifyCode);
                ChannelMap.addIpAndPort(inferiorPlatformId, upConnect);
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    
    
    /**
     * 从链路连接保持请求
     *
     * @param inferiorPlatformId
     */
    public void downLinkTest(int inferiorPlatformId) {
        //创建定时任务间隔发送链接保持请求
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //获取从链路通道
                Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
                if (null != channel && channel.isActive()) {
                    log.info("从链路连接保持请求({}):{}", DataType.DOWN_LINKTEST_REQ.getCode(), "");
                    OuterPacket out = new OuterPacket(DataType.DOWN_LINKTEST_REQ.getCode(), null);
                    channel.writeAndFlush(out);
                    channel.flush();
                } else {
                    //记录失败次数,然后再重新连接
                    int times = ChannelMap.getTimes(inferiorPlatformId);
                    if (times >= 18) {
                        UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
                        boolean b = downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
                        if (b) {
                            times = 0;
                        } else {
                            times++;
                        }
                    } else {
                        times++;
                    }
                    ChannelMap.saveTimes(inferiorPlatformId, times);
                }
            }
        }, 10, 10, TimeUnit.SECONDS);
    }
    
    
    /**
     * 从链路断开通知
     *
     * @param ctx
     * @param errorCode
     */
    public void downDisconnectInform(ChannelHandlerContext ctx, int errorCode) {
        DOWNDisconnectInform downDisconnectInform = new DOWNDisconnectInform();
        downDisconnectInform.setErrorCode(errorCode);
        log.info("从链路断开通知({}):{}", DataType.DOWN_DISCONNECT_INFORM.getCode(), JSON.toJSONString(downDisconnectInform));
        byte[] body = downDisconnectInform.encode();
        OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), body);
        ctx.writeAndFlush(out);
        ctx.flush();
    }
}