fix
杨锴
2025-06-16 3fa53409f5132333ce6d83fff796e108ddd62090
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//
//  Delay.swift
//  RxSwift
//
//  Created by tarunon on 2016/02/09.
//  Copyright © 2016 Krunoslav Zaher. All rights reserved.
//
 
import Foundation
 
extension ObservableType {
 
    /**
     Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed.
 
     - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)
 
     - parameter dueTime: Relative time shift of the source by.
     - parameter scheduler: Scheduler to run the subscription delay timer on.
     - returns: the source Observable shifted in time by the specified delay.
     */
    public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }
}
 
final private class DelaySink<Observer: ObserverType>
    : Sink<Observer>
    , ObserverType {
    typealias Element = Observer.Element 
    typealias Source = Observable<Element>
    typealias DisposeKey = Bag<Disposable>.KeyType
    
    private let lock = RecursiveLock()
 
    private let dueTime: RxTimeInterval
    private let scheduler: SchedulerType
    
    private let sourceSubscription = SingleAssignmentDisposable()
    private let cancelable = SerialDisposable()
 
    // is scheduled some action
    private var active = false
    // is "run loop" on different scheduler running
    private var running = false
    private var errorEvent: Event<Element>?
 
    // state
    private var queue = Queue<(eventTime: RxTime, event: Event<Element>)>(capacity: 0)
    
    init(observer: Observer, dueTime: RxTimeInterval, scheduler: SchedulerType, cancel: Cancelable) {
        self.dueTime = dueTime
        self.scheduler = scheduler
        super.init(observer: observer, cancel: cancel)
    }
 
    // All of these complications in this method are caused by the fact that 
    // error should be propagated immediately. Error can be potentially received on different
    // scheduler so this process needs to be synchronized somehow.
    //
    // Another complication is that scheduler is potentially concurrent so internal queue is used.
    func drainQueue(state: (), scheduler: AnyRecursiveScheduler<()>) {
        self.lock.lock()    
        let hasFailed = self.errorEvent != nil
        if !hasFailed {
            self.running = true
        }
        self.lock.unlock()  
 
        if hasFailed {
            return
        }
 
        var ranAtLeastOnce = false
 
        while true {
            self.lock.lock() 
            let errorEvent = self.errorEvent
 
            let eventToForwardImmediately = ranAtLeastOnce ? nil : self.queue.dequeue()?.event
            let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !self.queue.isEmpty ? self.queue.peek().eventTime : nil
 
            if errorEvent == nil {
                if eventToForwardImmediately != nil {
                }
                else if nextEventToScheduleOriginalTime != nil {
                    self.running = false
                }
                else {
                    self.running = false
                    self.active = false
                }
            }
            self.lock.unlock() 
 
            if let errorEvent = errorEvent {
                self.forwardOn(errorEvent)
                self.dispose()
                return
            }
            else {
                if let eventToForwardImmediately = eventToForwardImmediately {
                    ranAtLeastOnce = true
                    self.forwardOn(eventToForwardImmediately)
                    if case .completed = eventToForwardImmediately {
                        self.dispose()
                        return
                    }
                }
                else if let nextEventToScheduleOriginalTime = nextEventToScheduleOriginalTime {
                    scheduler.schedule((), dueTime: self.dueTime.reduceWithSpanBetween(earlierDate: nextEventToScheduleOriginalTime, laterDate: self.scheduler.now))
                    return
                }
                else {
                    return
                }
            }
        }
    }
    
    func on(_ event: Event<Element>) {
        if event.isStopEvent {
            self.sourceSubscription.dispose()
        }
 
        switch event {
        case .error:
            self.lock.lock()    
            let shouldSendImmediately = !self.running
            self.queue = Queue(capacity: 0)
            self.errorEvent = event
            self.lock.unlock()  
 
            if shouldSendImmediately {
                self.forwardOn(event)
                self.dispose()
            }
        default:
            self.lock.lock()    
            let shouldSchedule = !self.active
            self.active = true
            self.queue.enqueue((self.scheduler.now, event))
            self.lock.unlock()  
 
            if shouldSchedule {
                self.cancelable.disposable = self.scheduler.scheduleRecursive((), dueTime: self.dueTime, action: self.drainQueue)
            }
        }
    }
    
    func run(source: Observable<Element>) -> Disposable {
        self.sourceSubscription.setDisposable(source.subscribe(self))
        return Disposables.create(sourceSubscription, cancelable)
    }
}
 
final private class Delay<Element>: Producer<Element> {
    private let source: Observable<Element>
    private let dueTime: RxTimeInterval
    private let scheduler: SchedulerType
    
    init(source: Observable<Element>, dueTime: RxTimeInterval, scheduler: SchedulerType) {
        self.source = source
        self.dueTime = dueTime
        self.scheduler = scheduler
    }
 
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = DelaySink(observer: observer, dueTime: self.dueTime, scheduler: self.scheduler, cancel: cancel)
        let subscription = sink.run(source: self.source)
        return (sink: sink, subscription: subscription)
    }
}