杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//
//  Debounce.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 9/11/16.
//  Copyright © 2016 Krunoslav Zaher. All rights reserved.
//
 
import Foundation
 
extension ObservableType {
 
    /**
     Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers.
 
     - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html)
 
     - parameter dueTime: Throttling duration for each element.
     - parameter scheduler: Scheduler to run the throttle timers on.
     - returns: The throttled sequence.
     */
    public func debounce(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Debounce(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }
}
 
final private class DebounceSink<Observer: ObserverType>
    : Sink<Observer>
    , ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    typealias Element = Observer.Element 
    typealias ParentType = Debounce<Element>
 
    private let parent: ParentType
 
    let lock = RecursiveLock()
 
    // state
    private var id = 0 as UInt64
    private var value: Element?
 
    let cancellable = SerialDisposable()
 
    init(parent: ParentType, observer: Observer, cancel: Cancelable) {
        self.parent = parent
 
        super.init(observer: observer, cancel: cancel)
    }
 
    func run() -> Disposable {
        let subscription = self.parent.source.subscribe(self)
 
        return Disposables.create(subscription, cancellable)
    }
 
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }
 
    func synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            self.id = self.id &+ 1
            let currentId = self.id
            self.value = element
 
 
            let scheduler = self.parent.scheduler
            let dueTime = self.parent.dueTime
 
            let d = SingleAssignmentDisposable()
            self.cancellable.disposable = d
            d.setDisposable(scheduler.scheduleRelative(currentId, dueTime: dueTime, action: self.propagate))
        case .error:
            self.value = nil
            self.forwardOn(event)
            self.dispose()
        case .completed:
            if let value = self.value {
                self.value = nil
                self.forwardOn(.next(value))
            }
            self.forwardOn(.completed)
            self.dispose()
        }
    }
 
    func propagate(_ currentId: UInt64) -> Disposable {
        self.lock.performLocked {
            let originalValue = self.value
 
            if let value = originalValue, self.id == currentId {
                self.value = nil
                self.forwardOn(.next(value))
            }
 
            return Disposables.create()
        }
    }
}
 
final private class Debounce<Element>: Producer<Element> {
    fileprivate let source: Observable<Element>
    fileprivate let dueTime: RxTimeInterval
    fileprivate let scheduler: SchedulerType
 
    init(source: Observable<Element>, dueTime: RxTimeInterval, scheduler: SchedulerType) {
        self.source = source
        self.dueTime = dueTime
        self.scheduler = scheduler
    }
 
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = DebounceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    
}