宽窄优行-由【嘉易行】项目成品而来
younger_times
2023-04-06 a1ae6802080a22e6e6ce6d0935e95facb1daca5c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//
//  Zip.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 5/23/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
protocol ZipSinkProtocol : class
{
    func next(_ index: Int)
    func fail(_ error: Swift.Error)
    func done(_ index: Int)
}
 
class ZipSink<Observer: ObserverType> : Sink<Observer>, ZipSinkProtocol {
    typealias Element = Observer.Element
    
    let _arity: Int
 
    let _lock = RecursiveLock()
 
    // state
    private var _isDone: [Bool]
    
    init(arity: Int, observer: Observer, cancel: Cancelable) {
        self._isDone = [Bool](repeating: false, count: arity)
        self._arity = arity
        
        super.init(observer: observer, cancel: cancel)
    }
 
    func getResult() throws -> Element {
        rxAbstractMethod()
    }
    
    func hasElements(_ index: Int) -> Bool {
        rxAbstractMethod()
    }
    
    func next(_ index: Int) {
        var hasValueAll = true
        
        for i in 0 ..< self._arity {
            if !self.hasElements(i) {
                hasValueAll = false
                break
            }
        }
        
        if hasValueAll {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }
    
    func fail(_ error: Swift.Error) {
        self.forwardOn(.error(error))
        self.dispose()
    }
    
    func done(_ index: Int) {
        self._isDone[index] = true
        
        var allDone = true
        
        for done in self._isDone where !done {
            allDone = false
            break
        }
        
        if allDone {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}
 
final class ZipObserver<Element>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias ValueSetter = (Element) -> Void
 
    private var _parent: ZipSinkProtocol?
    
    let _lock: RecursiveLock
    
    // state
    private let _index: Int
    private let _this: Disposable
    private let _setNextValue: ValueSetter
    
    init(lock: RecursiveLock, parent: ZipSinkProtocol, index: Int, setNextValue: @escaping ValueSetter, this: Disposable) {
        self._lock = lock
        self._parent = parent
        self._index = index
        self._this = this
        self._setNextValue = setNextValue
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func _synchronized_on(_ event: Event<Element>) {
        if self._parent != nil {
            switch event {
            case .next:
                break
            case .error:
                self._this.dispose()
            case .completed:
                self._this.dispose()
            }
        }
        
        if let parent = self._parent {
            switch event {
            case .next(let value):
                self._setNextValue(value)
                parent.next(self._index)
            case .error(let error):
                parent.fail(error)
            case .completed:
                parent.done(self._index)
            }
        }
    }
}