fix
杨锴
2025-06-16 3fa53409f5132333ce6d83fff796e108ddd62090
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//
//  Concat.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 3/21/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
extension ObservableType {
 
    /**
     Concatenates the second observable sequence to `self` upon successful termination of `self`.
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - parameter second: Second observable sequence.
     - returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
     */
    public func concat<Source: ObservableConvertibleType>(_ second: Source) -> Observable<Element> where Source.Element == Element {
        Observable.concat([self.asObservable(), second.asObservable()])
    }
}
 
extension ObservableType {
    /**
     Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully.
 
     This operator has tail recursive optimizations that will prevent stack overflow.
 
     Optimizations will be performed in cases equivalent to following:
 
     [1, [2, [3, .....].concat()].concat].concat()
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - returns: An observable sequence that contains the elements of each given sequence, in sequential order.
     */
    public static func concat<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
        where Sequence.Element == Observable<Element> {
            return Concat(sources: sequence, count: nil)
    }
 
    /**
     Concatenates all observable sequences in the given collection, as long as the previous observable sequence terminated successfully.
 
     This operator has tail recursive optimizations that will prevent stack overflow.
 
     Optimizations will be performed in cases equivalent to following:
 
     [1, [2, [3, .....].concat()].concat].concat()
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - returns: An observable sequence that contains the elements of each given sequence, in sequential order.
     */
    public static func concat<Collection: Swift.Collection>(_ collection: Collection) -> Observable<Element>
        where Collection.Element == Observable<Element> {
            return Concat(sources: collection, count: Int64(collection.count))
    }
 
    /**
     Concatenates all observable sequences in the given collection, as long as the previous observable sequence terminated successfully.
 
     This operator has tail recursive optimizations that will prevent stack overflow.
 
     Optimizations will be performed in cases equivalent to following:
 
     [1, [2, [3, .....].concat()].concat].concat()
 
     - seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
 
     - returns: An observable sequence that contains the elements of each given sequence, in sequential order.
     */
    public static func concat(_ sources: Observable<Element> ...) -> Observable<Element> {
        Concat(sources: sources, count: Int64(sources.count))
    }
}
 
final private class ConcatSink<Sequence: Swift.Sequence, Observer: ObserverType>
    : TailRecursiveSink<Sequence, Observer>
    , ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
    typealias Element = Observer.Element 
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }
 
    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)
    }
    
    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil
        }
    }
}
 
final private class Concat<Sequence: Swift.Sequence>: Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
    typealias Element = Sequence.Element.Element
    
    fileprivate let sources: Sequence
    fileprivate let count: IntMax?
 
    init(sources: Sequence, count: IntMax?) {
        self.sources = sources
        self.count = count
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ConcatSink<Sequence, Observer>(observer: observer, cancel: cancel)
        let subscription = sink.run((self.sources.makeIterator(), self.count))
        return (sink: sink, subscription: subscription)
    }
}