杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//
//  Amb.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 6/14/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
extension ObservableType {
    /**
     Propagates the observable sequence that reacts first.
 
     - seealso: [amb operator on reactivex.io](http://reactivex.io/documentation/operators/amb.html)
 
     - returns: An observable sequence that surfaces any of the given sequences, whichever reacted first.
     */
    public static func amb<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
        where Sequence.Element == Observable<Element> {
        sequence.reduce(Observable<Sequence.Element.Element>.never()) { a, o in
            a.amb(o.asObservable())
        }
    }
}
 
extension ObservableType {
 
    /**
     Propagates the observable sequence that reacts first.
 
     - seealso: [amb operator on reactivex.io](http://reactivex.io/documentation/operators/amb.html)
 
     - parameter right: Second observable sequence.
     - returns: An observable sequence that surfaces either of the given sequences, whichever reacted first.
     */
    public func amb<O2: ObservableType>
        (_ right: O2)
        -> Observable<Element> where O2.Element == Element {
        Amb(left: self.asObservable(), right: right.asObservable())
    }
}
 
private enum AmbState {
    case neither
    case left
    case right
}
 
final private class AmbObserver<Observer: ObserverType>: ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = AmbSink<Observer>
    typealias This = AmbObserver<Observer>
    typealias Sink = (This, Event<Element>) -> Void
    
    private let parent: Parent
    fileprivate var sink: Sink
    fileprivate var cancel: Disposable
    
    init(parent: Parent, cancel: Disposable, sink: @escaping Sink) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        
        self.parent = parent
        self.sink = sink
        self.cancel = cancel
    }
    
    func on(_ event: Event<Element>) {
        self.sink(self, event)
        if event.isStopEvent {
            self.cancel.dispose()
        }
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
}
 
final private class AmbSink<Observer: ObserverType>: Sink<Observer> {
    typealias Element = Observer.Element
    typealias Parent = Amb<Element>
    typealias AmbObserverType = AmbObserver<Observer>
 
    private let parent: Parent
    
    private let lock = RecursiveLock()
    // state
    private var choice = AmbState.neither
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()
        let disposeAll = Disposables.create(subscription1, subscription2)
        
        let forwardEvent = { (o: AmbObserverType, event: Event<Element>) -> Void in
            self.forwardOn(event)
            if event.isStopEvent {
                self.dispose()
            }
        }
 
        let decide = { (o: AmbObserverType, event: Event<Element>, me: AmbState, otherSubscription: Disposable) in
            self.lock.performLocked {
                if self.choice == .neither {
                    self.choice = me
                    o.sink = forwardEvent
                    o.cancel = disposeAll
                    otherSubscription.dispose()
                }
                
                if self.choice == me {
                    self.forwardOn(event)
                    if event.isStopEvent {
                        self.dispose()
                    }
                }
            }
        }
        
        let sink1 = AmbObserver(parent: self, cancel: subscription1) { o, e in
            decide(o, e, .left, subscription2)
        }
        
        let sink2 = AmbObserver(parent: self, cancel: subscription1) { o, e in
            decide(o, e, .right, subscription1)
        }
        
        subscription1.setDisposable(self.parent.left.subscribe(sink1))
        subscription2.setDisposable(self.parent.right.subscribe(sink2))
        
        return disposeAll
    }
}
 
final private class Amb<Element>: Producer<Element> {
    fileprivate let left: Observable<Element>
    fileprivate let right: Observable<Element>
    
    init(left: Observable<Element>, right: Observable<Element>) {
        self.left = left
        self.right = right
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AmbSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}