杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//
//  Sample.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 5/1/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
extension ObservableType {
 
    /**
     Samples the source observable sequence using a sampler observable sequence producing sampling ticks.
 
     Upon each sampling tick, the latest element (if any) in the source sequence during the last sampling interval is sent to the resulting sequence.
 
     **In case there were no new elements between sampler ticks, you may provide a default value to be emitted, instead
       to the resulting sequence otherwise no element is sent.**
 
     - seealso: [sample operator on reactivex.io](http://reactivex.io/documentation/operators/sample.html)
 
     - parameter sampler: Sampling tick sequence.
     - parameter defaultValue: a value to return if there are no new elements between sampler ticks
     - returns: Sampled observable sequence.
     */
    public func sample<Source: ObservableType>(_ sampler: Source, defaultValue: Element? = nil)
        -> Observable<Element> {
            return Sample(source: self.asObservable(), sampler: sampler.asObservable(), defaultValue: defaultValue)
    }
}
 
final private class SamplerSink<Observer: ObserverType, SampleType>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = SampleType
    
    typealias Parent = SampleSequenceSink<Observer, SampleType>
    
    private let parent: Parent
 
    var lock: RecursiveLock {
        self.parent.lock
    }
    
    init(parent: Parent) {
        self.parent = parent
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next, .completed:
            if let element = parent.element ?? self.parent.defaultValue {
                self.parent.element = nil
                self.parent.forwardOn(.next(element))
            }
 
            if self.parent.atEnd {
                self.parent.forwardOn(.completed)
                self.parent.dispose()
            }
        case .error(let e):
            self.parent.forwardOn(.error(e))
            self.parent.dispose()
        }
    }
}
 
final private class SampleSequenceSink<Observer: ObserverType, SampleType>
    : Sink<Observer>
    , ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = Observer.Element 
    typealias Parent = Sample<Element, SampleType>
    
    fileprivate let parent: Parent
    fileprivate let defaultValue: Element?
 
    let lock = RecursiveLock()
    
    // state
    fileprivate var element = nil as Element?
    fileprivate var atEnd = false
    
    private let sourceSubscription = SingleAssignmentDisposable()
    
    init(parent: Parent, observer: Observer, cancel: Cancelable, defaultValue: Element? = nil) {
        self.parent = parent
        self.defaultValue = defaultValue
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        self.sourceSubscription.setDisposable(self.parent.source.subscribe(self))
        let samplerSubscription = self.parent.sampler.subscribe(SamplerSink(parent: self))
        
        return Disposables.create(sourceSubscription, samplerSubscription)
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            self.element = element
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.atEnd = true
            self.sourceSubscription.dispose()
        }
    }
    
}
 
final private class Sample<Element, SampleType>: Producer<Element> {
    fileprivate let source: Observable<Element>
    fileprivate let sampler: Observable<SampleType>
    fileprivate let defaultValue: Element?
    
    init(source: Observable<Element>, sampler: Observable<SampleType>, defaultValue: Element? = nil) {
        self.source = source
        self.sampler = sampler
        self.defaultValue = defaultValue
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel, defaultValue: self.defaultValue)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}