杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//
//  Throttle.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 3/22/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
import Foundation
 
extension ObservableType {
 
    /**
     Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration.
 
     This operator makes sure that no two elements are emitted in less then dueTime.
 
     - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html)
 
     - parameter dueTime: Throttling duration for each element.
     - parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted.
     - parameter scheduler: Scheduler to run the throttle timers on.
     - returns: The throttled sequence.
     */
    public func throttle(_ dueTime: RxTimeInterval, latest: Bool = true, scheduler: SchedulerType)
        -> Observable<Element> {
        Throttle(source: self.asObservable(), dueTime: dueTime, latest: latest, scheduler: scheduler)
    }
}
 
final private class ThrottleSink<Observer: ObserverType>
    : Sink<Observer>
    , ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = Observer.Element 
    typealias ParentType = Throttle<Element>
    
    private let parent: ParentType
    
    let lock = RecursiveLock()
    
    // state
    private var lastUnsentElement: Element?
    private var lastSentTime: Date?
    private var completed: Bool = false
 
    let cancellable = SerialDisposable()
    
    init(parent: ParentType, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        let subscription = self.parent.source.subscribe(self)
        
        return Disposables.create(subscription, cancellable)
    }
 
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            let now = self.parent.scheduler.now
 
            let reducedScheduledTime: RxTimeInterval
 
            if let lastSendingTime = self.lastSentTime {
                reducedScheduledTime = self.parent.dueTime.reduceWithSpanBetween(earlierDate: lastSendingTime, laterDate: now)
            }
            else {
                reducedScheduledTime = .nanoseconds(0)
            }
 
            if reducedScheduledTime.isNow {
                self.sendNow(element: element)
                return
            }
 
            if !self.parent.latest {
                return
            }
 
            let isThereAlreadyInFlightRequest = self.lastUnsentElement != nil
            
            self.lastUnsentElement = element
 
            if isThereAlreadyInFlightRequest {
                return
            }
 
            let scheduler = self.parent.scheduler
 
            let d = SingleAssignmentDisposable()
            self.cancellable.disposable = d
 
            d.setDisposable(scheduler.scheduleRelative(0, dueTime: reducedScheduledTime, action: self.propagate))
        case .error:
            self.lastUnsentElement = nil
            self.forwardOn(event)
            self.dispose()
        case .completed:
            if self.lastUnsentElement != nil {
                self.completed = true
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
 
    private func sendNow(element: Element) {
        self.lastUnsentElement = nil
        self.forwardOn(.next(element))
        // in case element processing takes a while, this should give some more room
        self.lastSentTime = self.parent.scheduler.now
    }
    
    func propagate(_: Int) -> Disposable {
        self.lock.performLocked {
            if let lastUnsentElement = self.lastUnsentElement {
                self.sendNow(element: lastUnsentElement)
            }
 
            if self.completed {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
 
        return Disposables.create()
    }
}
 
final private class Throttle<Element>: Producer<Element> {
    fileprivate let source: Observable<Element>
    fileprivate let dueTime: RxTimeInterval
    fileprivate let latest: Bool
    fileprivate let scheduler: SchedulerType
 
    init(source: Observable<Element>, dueTime: RxTimeInterval, latest: Bool, scheduler: SchedulerType) {
        self.source = source
        self.dueTime = dueTime
        self.latest = latest
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ThrottleSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    
}