杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//
//  Zip.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 5/23/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
protocol ZipSinkProtocol: AnyObject {
    func next(_ index: Int)
    func fail(_ error: Swift.Error)
    func done(_ index: Int)
}
 
class ZipSink<Observer: ObserverType> : Sink<Observer>, ZipSinkProtocol {
    typealias Element = Observer.Element
    
    let arity: Int
 
    let lock = RecursiveLock()
 
    // state
    private var isDone: [Bool]
    
    init(arity: Int, observer: Observer, cancel: Cancelable) {
        self.isDone = [Bool](repeating: false, count: arity)
        self.arity = arity
        
        super.init(observer: observer, cancel: cancel)
    }
 
    func getResult() throws -> Element {
        rxAbstractMethod()
    }
    
    func hasElements(_ index: Int) -> Bool {
        rxAbstractMethod()
    }
    
    func next(_ index: Int) {
        var hasValueAll = true
        
        for i in 0 ..< self.arity {
            if !self.hasElements(i) {
                hasValueAll = false
                break
            }
        }
        
        if hasValueAll {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }
    
    func fail(_ error: Swift.Error) {
        self.forwardOn(.error(error))
        self.dispose()
    }
    
    func done(_ index: Int) {
        self.isDone[index] = true
        
        var allDone = true
        
        for done in self.isDone where !done {
            allDone = false
            break
        }
        
        if allDone {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}
 
final class ZipObserver<Element>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias ValueSetter = (Element) -> Void
 
    private var parent: ZipSinkProtocol?
    
    let lock: RecursiveLock
    
    // state
    private let index: Int
    private let this: Disposable
    private let setNextValue: ValueSetter
    
    init(lock: RecursiveLock, parent: ZipSinkProtocol, index: Int, setNextValue: @escaping ValueSetter, this: Disposable) {
        self.lock = lock
        self.parent = parent
        self.index = index
        self.this = this
        self.setNextValue = setNextValue
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func synchronized_on(_ event: Event<Element>) {
        if self.parent != nil {
            switch event {
            case .next:
                break
            case .error:
                self.this.dispose()
            case .completed:
                self.this.dispose()
            }
        }
        
        if let parent = self.parent {
            switch event {
            case .next(let value):
                self.setNextValue(value)
                parent.next(self.index)
            case .error(let error):
                parent.fail(error)
            case .completed:
                parent.done(self.index)
            }
        }
    }
}