puzhibing
2 天以前 ea7595c4c75926f85388574b261b8ba90cf60e0d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.ruoyi.dataInterchange.server;
 
import com.alibaba.fastjson.JSON;
import com.ruoyi.dataInterchange.model.DOWNDisconnectInform;
import com.ruoyi.dataInterchange.model.UPConnectReq;
import com.ruoyi.dataInterchange.model.UPConnectRsp;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.netty.client.NettyClient;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.system.api.feignClient.EnterpriseClient;
import com.ruoyi.system.api.model.Enterprise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * @author zhibing.pu
 * @Date 2025/3/4 18:40
 */
@Slf4j
@Component
public class ConnectReqService {
    
    @Resource
    private EnterpriseClient enterpriseClient;
    
    @Resource
    private RedisTemplate redisTemplate;
    
    
    /**
     * 主链路登录请求业务逻辑
     *
     * @param ctx
     */
    public void connect(ChannelHandlerContext ctx, OuterPacket outerPacket) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(outerPacket.getBody());
        //解析封装原元数据
        UPConnectReq upConnectReq = new UPConnectReq().decode(byteBuf);
        log.info("主链路登录请求体:" + JSON.toJSONString(upConnectReq));
        Enterprise data = enterpriseClient.getEnterprise(upConnectReq.getUserId() + "").getData();
        UPConnectRsp upConnectRsp = new UPConnectRsp();
        if (null == data) {
            //用户没有注册
            upConnectRsp.setResult(0x03);
        } else {
            //IP地址不对
            if (!upConnectReq.getDownLinkIp().equals(data.getIpWhitelist())) {
                upConnectRsp.setResult(0x01);
            } else if (!data.getPassword().equals(upConnectReq.getPassword())) {
                upConnectRsp.setResult(0x04);
            } else if (outerPacket.getGnsscenterId() != Integer.valueOf(data.getCode())) {
                upConnectRsp.setResult(0x02);
            } else {
                upConnectRsp.setResult(0x00);
            }
        }
        // 随机一个校验码
        int verifyCode = new Random().nextInt(9999);
        upConnectRsp.setVerifyCode(verifyCode);
        //主链路登录应答
        log.info("主链路登录应答({}):{}", DataType.UP_CONNECT_RSP.getCode(), JSON.toJSONString(upConnectRsp));
        byte[] body = upConnectRsp.encode();
        OuterPacket out = new OuterPacket(DataType.UP_CONNECT_RSP.getCode(),outerPacket.getGnsscenterId() , body);
        ctx.writeAndFlush(out);
        ctx.flush();
        if (upConnectRsp.getResult() == 0x00) {
            redisTemplate.opsForValue().set("login:" + outerPacket.getGnsscenterId(), System.currentTimeMillis(), 1, TimeUnit.MINUTES);
            //保存链路
            ChannelMap.addServerChannel(outerPacket.getGnsscenterId(), ctx.channel());
            //从链路连接
            downConnect(ctx, outerPacket.getGnsscenterId(), upConnectReq.getDownLinkIp(), upConnectReq.getDownLinkPort(), verifyCode);
        } else {
            ctx.close();
        }
    }
    
    
    /**
     * 从链路连接
     *
     * @param inferiorPlatformId 企业唯一标识
     * @param host               连接IP
     * @param port               连接端口
     */
    public void downConnect(ChannelHandlerContext ctx, int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            boolean b = downConnect(inferiorPlatformId, host, port, verifyCode);
        } catch (Exception e) {
            downDisconnectInform(ctx, inferiorPlatformId, 0x00);
            throw new RuntimeException(e);
        }
    }
    
    
    /**
     * 从链路连接
     *
     * @param inferiorPlatformId 企业唯一标识
     * @param host               连接IP
     * @param port               连接端口
     */
    public boolean downConnect(int inferiorPlatformId, String host, int port, int verifyCode) {
        try {
            ExecutorService executorService = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        new NettyClient(host, port).start(inferiorPlatformId, verifyCode);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    
    
    /**
     * 从链路断开通知
     *
     * @param ctx
     * @param errorCode
     */
    public void downDisconnectInform(ChannelHandlerContext ctx, int inferiorPlatformId, int errorCode) {
        DOWNDisconnectInform downDisconnectInform = new DOWNDisconnectInform();
        downDisconnectInform.setErrorCode(errorCode);
        log.info("从链路断开通知({}):{}", DataType.DOWN_DISCONNECT_INFORM.getCode(), JSON.toJSONString(downDisconnectInform));
        byte[] body = downDisconnectInform.encode();
        OuterPacket out = new OuterPacket(DataType.DOWN_CONNECT_REQ.getCode(), inferiorPlatformId, body);
        ctx.writeAndFlush(out);
        ctx.flush();
    }
}