杨锴
2025-04-16 09a372bc45fde16fd42257ab6f78b8deeecf720b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//
//  Completable+AndThen.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 7/2/17.
//  Copyright © 2017 Krunoslav Zaher. All rights reserved.
//
 
extension PrimitiveSequenceType where Trait == CompletableTrait, Element == Never {
    /**
     Concatenates the second observable sequence to `self` upon successful termination of `self`.
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - parameter second: Second observable sequence.
     - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
     */
    public func andThen<Element>(_ second: Single<Element>) -> Single<Element> {
        let completable = self.primitiveSequence.asObservable()
        return Single(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
    }
 
    /**
     Concatenates the second observable sequence to `self` upon successful termination of `self`.
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - parameter second: Second observable sequence.
     - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
     */
    public func andThen<Element>(_ second: Maybe<Element>) -> Maybe<Element> {
        let completable = self.primitiveSequence.asObservable()
        return Maybe(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
    }
 
    /**
     Concatenates the second observable sequence to `self` upon successful termination of `self`.
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - parameter second: Second observable sequence.
     - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
     */
    public func andThen(_ second: Completable) -> Completable {
        let completable = self.primitiveSequence.asObservable()
        return Completable(raw: ConcatCompletable(completable: completable, second: second.asObservable()))
    }
 
    /**
     Concatenates the second observable sequence to `self` upon successful termination of `self`.
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - parameter second: Second observable sequence.
     - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
     */
    public func andThen<Element>(_ second: Observable<Element>) -> Observable<Element> {
        let completable = self.primitiveSequence.asObservable()
        return ConcatCompletable(completable: completable, second: second.asObservable())
    }
}
 
final private class ConcatCompletable<Element>: Producer<Element> {
    fileprivate let completable: Observable<Never>
    fileprivate let second: Observable<Element>
 
    init(completable: Observable<Never>, second: Observable<Element>) {
        self.completable = completable
        self.second = second
    }
 
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ConcatCompletableSink(second: second, observer: observer, cancel: cancel)
        let subscription = sink.run(completable: completable)
        return (sink: sink, subscription: subscription)
    }
}
 
final private class ConcatCompletableSink<Observer: ObserverType>
    : Sink<Observer>
    , ObserverType {
    typealias Element = Never
    typealias Parent = ConcatCompletable<Observer.Element>
 
    private let second: Observable<Observer.Element>
    private let subscription = SerialDisposable()
    
    init(second: Observable<Observer.Element>, observer: Observer, cancel: Cancelable) {
        self.second = second
        super.init(observer: observer, cancel: cancel)
    }
 
    func on(_ event: Event<Element>) {
        switch event {
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .next:
            break
        case .completed:
            let otherSink = ConcatCompletableSinkOther(parent: self)
            self.subscription.disposable = self.second.subscribe(otherSink)
        }
    }
 
    func run(completable: Observable<Never>) -> Disposable {
        let subscription = SingleAssignmentDisposable()
        self.subscription.disposable = subscription
        subscription.setDisposable(completable.subscribe(self))
        return self.subscription
    }
}
 
final private class ConcatCompletableSinkOther<Observer: ObserverType>
    : ObserverType {
    typealias Element = Observer.Element 
 
    typealias Parent = ConcatCompletableSink<Observer>
    
    private let parent: Parent
 
    init(parent: Parent) {
        self.parent = parent
    }
 
    func on(_ event: Event<Observer.Element>) {
        self.parent.forwardOn(event)
        if event.isStopEvent {
            self.parent.dispose()
        }
    }
}