宽窄优行-由【嘉易行】项目成品而来
younger_times
2023-04-06 a1ae6802080a22e6e6ce6d0935e95facb1daca5c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//
//  Sample.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 5/1/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
extension ObservableType {
 
    /**
     Samples the source observable sequence using a sampler observable sequence producing sampling ticks.
 
     Upon each sampling tick, the latest element (if any) in the source sequence during the last sampling interval is sent to the resulting sequence.
 
     **In case there were no new elements between sampler ticks, no element is sent to the resulting sequence.**
 
     - seealso: [sample operator on reactivex.io](http://reactivex.io/documentation/operators/sample.html)
 
     - parameter sampler: Sampling tick sequence.
     - returns: Sampled observable sequence.
     */
    public func sample<Source: ObservableType>(_ sampler: Source)
        -> Observable<Element> {
            return Sample(source: self.asObservable(), sampler: sampler.asObservable())
    }
}
 
final private class SamplerSink<Observer: ObserverType, SampleType>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = SampleType
    
    typealias Parent = SampleSequenceSink<Observer, SampleType>
    
    private let _parent: Parent
 
    var _lock: RecursiveLock {
        return self._parent._lock
    }
    
    init(parent: Parent) {
        self._parent = parent
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next, .completed:
            if let element = _parent._element {
                self._parent._element = nil
                self._parent.forwardOn(.next(element))
            }
 
            if self._parent._atEnd {
                self._parent.forwardOn(.completed)
                self._parent.dispose()
            }
        case .error(let e):
            self._parent.forwardOn(.error(e))
            self._parent.dispose()
        }
    }
}
 
final private class SampleSequenceSink<Observer: ObserverType, SampleType>
    : Sink<Observer>
    , ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = Observer.Element 
    typealias Parent = Sample<Element, SampleType>
    
    private let _parent: Parent
 
    let _lock = RecursiveLock()
    
    // state
    fileprivate var _element = nil as Element?
    fileprivate var _atEnd = false
    
    private let _sourceSubscription = SingleAssignmentDisposable()
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        self._sourceSubscription.setDisposable(self._parent._source.subscribe(self))
        let samplerSubscription = self._parent._sampler.subscribe(SamplerSink(parent: self))
        
        return Disposables.create(_sourceSubscription, samplerSubscription)
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            self._element = element
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self._atEnd = true
            self._sourceSubscription.dispose()
        }
    }
    
}
 
final private class Sample<Element, SampleType>: Producer<Element> {
    fileprivate let _source: Observable<Element>
    fileprivate let _sampler: Observable<SampleType>
 
    init(source: Observable<Element>, sampler: Observable<SampleType>) {
        self._source = source
        self._sampler = sampler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}