杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
//
//  SharedSequence.swift
//  RxCocoa
//
//  Created by Krunoslav Zaher on 8/27/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
import RxSwift
 
/**
    Trait that represents observable sequence that shares computation resources with following properties:
 
    - it never fails
    - it delivers events on `SharingStrategy.scheduler`
    - sharing strategy is customizable using `SharingStrategy.share` behavior
 
    `SharedSequence<Element>` can be considered a builder pattern for observable sequences that share computation resources.
 
    To find out more about units and how to use them, please visit `Documentation/Traits.md`.
*/
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType, ObservableConvertibleType {
    let source: Observable<Element>
 
    init(_ source: Observable<Element>) {
        self.source = SharingStrategy.share(source)
    }
 
    init(raw: Observable<Element>) {
        self.source = raw
    }
 
    #if EXPANDABLE_SHARED_SEQUENCE
    /**
     This method is extension hook in case this unit needs to extended from outside the library.
     
     By defining `EXPANDABLE_SHARED_SEQUENCE` one agrees that it's up to them to ensure shared sequence
     properties are preserved after extension.
    */
    public static func createUnsafe<Source: ObservableType>(source: Source) -> SharedSequence<SharingStrategy, Source.Element> {
        SharedSequence<SharingStrategy, Source.Element>(raw: source.asObservable())
    }
    #endif
 
    /**
    - returns: Built observable sequence.
    */
    public func asObservable() -> Observable<Element> {
        self.source
    }
 
    /**
    - returns: `self`
    */
    public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
        self
    }
    
    /// - returns: `Infallible` interface.
    public func asInfallible() -> Infallible<Element> {
        asInfallible(onErrorFallbackTo: .empty())
    }
}
 
/**
 Different `SharedSequence` sharing strategies must conform to this protocol.
 */
public protocol SharingStrategyProtocol {
    /**
     Scheduled on which all sequence events will be delivered.
    */
    static var scheduler: SchedulerType { get }
 
    /**
     Computation resources sharing strategy for multiple sequence observers.
     
     E.g. One can choose `share(replay:scope:)`
     as sequence event sharing strategies, but also do something more exotic, like
     implementing promises or lazy loading chains.
    */
    static func share<Element>(_ source: Observable<Element>) -> Observable<Element>
}
 
/**
A type that can be converted to `SharedSequence`.
*/
public protocol SharedSequenceConvertibleType : ObservableConvertibleType {
    associatedtype SharingStrategy: SharingStrategyProtocol
 
    /**
    Converts self to `SharedSequence`.
    */
    func asSharedSequence() -> SharedSequence<SharingStrategy, Element>
}
 
extension SharedSequenceConvertibleType {
    public func asObservable() -> Observable<Element> {
        self.asSharedSequence().asObservable()
    }
}
 
 
extension SharedSequence {
 
    /**
    Returns an empty observable sequence, using the specified scheduler to send out the single `Completed` message.
 
    - returns: An observable sequence with no elements.
    */
    public static func empty() -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(raw: Observable.empty().subscribe(on: SharingStrategy.scheduler))
    }
 
    /**
    Returns a non-terminating observable sequence, which can be used to denote an infinite duration.
 
    - returns: An observable sequence whose observers will never get called.
    */
    public static func never() -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(raw: Observable.never())
    }
 
    /**
    Returns an observable sequence that contains a single element.
 
    - parameter element: Single element in the resulting observable sequence.
    - returns: An observable sequence containing the single specified element.
    */
    public static func just(_ element: Element) -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(raw: Observable.just(element).subscribe(on: SharingStrategy.scheduler))
    }
 
    /**
     Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
 
     - parameter observableFactory: Observable factory function to invoke for each observer that subscribes to the resulting sequence.
     - returns: An observable sequence whose observers trigger an invocation of the given observable factory function.
     */
    public static func deferred(_ observableFactory: @escaping () -> SharedSequence<SharingStrategy, Element>)
        -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(Observable.deferred { observableFactory().asObservable() })
    }
 
    /**
    This method creates a new Observable instance with a variable number of elements.
 
    - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
 
    - parameter elements: Elements to generate.
    - returns: The observable sequence whose elements are pulled from the given arguments.
    */
    public static func of(_ elements: Element ...) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(elements, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
}
 
extension SharedSequence {
    
    /**
    This method converts an array to an observable sequence.
     
    - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
    - returns: The observable sequence whose elements are pulled from the given enumerable sequence.
     */
    public static func from(_ array: [Element]) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(array, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
    
    /**
     This method converts a sequence to an observable sequence.
     
     - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
     - returns: The observable sequence whose elements are pulled from the given enumerable sequence.
    */
    public static func from<Sequence: Swift.Sequence>(_ sequence: Sequence) -> SharedSequence<SharingStrategy, Element> where Sequence.Element == Element {
        let source = Observable.from(sequence, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
    
    /**
     This method converts a optional to an observable sequence.
     
     - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
     - parameter optional: Optional element in the resulting observable sequence.
     
     - returns: An observable sequence containing the wrapped value or not from given optional.
     */
    public static func from(optional: Element?) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(optional: optional, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
}
 
extension SharedSequence where Element: RxAbstractInteger {
    /**
     Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.
 
     - seealso: [interval operator on reactivex.io](http://reactivex.io/documentation/operators/interval.html)
 
     - parameter period: Period for producing the values in the resulting sequence.
     - returns: An observable sequence that produces a value after each period.
     */
    public static func interval(_ period: RxTimeInterval)
        -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(Observable.interval(period, scheduler: SharingStrategy.scheduler))
    }
}
 
// MARK: timer
 
extension SharedSequence where Element: RxAbstractInteger {
    /**
     Returns an observable sequence that periodically produces a value after the specified initial relative due time has elapsed, using the specified scheduler to run timers.
 
     - seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html)
 
     - parameter dueTime: Relative time at which to produce the first value.
     - parameter period: Period to produce subsequent values.
     - returns: An observable sequence that produces a value after due time has elapsed and then each period.
     */
    public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval)
        -> SharedSequence<SharingStrategy, Element> {
        SharedSequence(Observable.timer(dueTime, period: period, scheduler: SharingStrategy.scheduler))
    }
}