杨锴
2025-03-11 90dc3329d1973fda691e357cf4523d5c7c67fa1d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//
//  VirtualTimeScheduler.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 2/14/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
import Foundation
 
/// Base class for virtual time schedulers using a priority queue for scheduled items.
open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
    : SchedulerType {
 
    public typealias VirtualTime = Converter.VirtualTimeUnit
    public typealias VirtualTimeInterval = Converter.VirtualTimeIntervalUnit
 
    private var running : Bool
 
    private var currentClock: VirtualTime
 
    private var schedulerQueue : PriorityQueue<VirtualSchedulerItem<VirtualTime>>
    private var converter: Converter
 
    private var nextId = 0
 
    private let thread: Thread
 
    /// - returns: Current time.
    public var now: RxTime {
        self.converter.convertFromVirtualTime(self.clock)
    }
 
    /// - returns: Scheduler's absolute time clock value.
    public var clock: VirtualTime {
        self.currentClock
    }
 
    /// Creates a new virtual time scheduler.
    ///
    /// - parameter initialClock: Initial value for the clock.
    public init(initialClock: VirtualTime, converter: Converter) {
        self.currentClock = initialClock
        self.running = false
        self.converter = converter
        self.thread = Thread.current
        self.schedulerQueue = PriorityQueue(hasHigherPriority: {
            switch converter.compareVirtualTime($0.time, $1.time) {
            case .lessThan:
                return true
            case .equal:
                return $0.id < $1.id
            case .greaterThan:
                return false
            }
        }, isEqual: { $0 === $1 })
        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
        #endif
    }
 
    /**
    Schedules an action to be executed immediately.
 
    - parameter state: State passed to the action to be executed.
    - parameter action: Action to be executed.
    - returns: The disposable object used to cancel the scheduled action (best effort).
    */
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleRelative(state, dueTime: .microseconds(0)) { a in
            return action(a)
        }
    }
 
    /**
     Schedules an action to be executed.
 
     - parameter state: State passed to the action to be executed.
     - parameter dueTime: Relative time after which to execute the action.
     - parameter action: Action to be executed.
     - returns: The disposable object used to cancel the scheduled action (best effort).
     */
    public func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
        let time = self.now.addingDispatchInterval(dueTime)
        let absoluteTime = self.converter.convertToVirtualTime(time)
        let adjustedTime = self.adjustScheduledTime(absoluteTime)
        return self.scheduleAbsoluteVirtual(state, time: adjustedTime, action: action)
    }
 
    /**
     Schedules an action to be executed after relative time has passed.
 
     - parameter state: State passed to the action to be executed.
     - parameter time: Absolute time when to execute the action. If this is less or equal then `now`, `now + 1`  will be used.
     - parameter action: Action to be executed.
     - returns: The disposable object used to cancel the scheduled action (best effort).
     */
    public func scheduleRelativeVirtual<StateType>(_ state: StateType, dueTime: VirtualTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
        let time = self.converter.offsetVirtualTime(self.clock, offset: dueTime)
        return self.scheduleAbsoluteVirtual(state, time: time, action: action)
    }
 
    /**
     Schedules an action to be executed at absolute virtual time.
 
     - parameter state: State passed to the action to be executed.
     - parameter time: Absolute time when to execute the action.
     - parameter action: Action to be executed.
     - returns: The disposable object used to cancel the scheduled action (best effort).
     */
    public func scheduleAbsoluteVirtual<StateType>(_ state: StateType, time: VirtualTime, action: @escaping (StateType) -> Disposable) -> Disposable {
        ensusreRunningOnCorrectThread()
        let compositeDisposable = CompositeDisposable()
 
        let item = VirtualSchedulerItem(action: {
            return action(state)
        }, time: time, id: self.nextId)
 
        self.nextId += 1
 
        self.schedulerQueue.enqueue(item)
        
        _ = compositeDisposable.insert(item)
        
        return compositeDisposable
    }
 
    /// Adjusts time of scheduling before adding item to schedule queue.
    open func adjustScheduledTime(_ time: VirtualTime) -> VirtualTime {
        time
    }
 
    /// Starts the virtual time scheduler.
    public func start() {
        if self.running {
            return
        }
 
        ensusreRunningOnCorrectThread()
        self.running = true
        repeat {
            guard let next = self.findNext() else {
                break
            }
 
            if self.converter.compareVirtualTime(next.time, self.clock).greaterThan {
                self.currentClock = next.time
            }
 
            next.invoke()
            self.schedulerQueue.remove(next)
        } while self.running
 
        self.running = false
    }
 
    func findNext() -> VirtualSchedulerItem<VirtualTime>? {
        while let front = self.schedulerQueue.peek() {
            if front.isDisposed {
                self.schedulerQueue.remove(front)
                continue
            }
 
            return front
        }
 
        return nil
    }
 
    /// Advances the scheduler's clock to the specified time, running all work till that point.
    ///
    /// - parameter virtualTime: Absolute time to advance the scheduler's clock to.
    public func advanceTo(_ virtualTime: VirtualTime) {
        if self.running {
            fatalError("Scheduler is already running")
        }
 
        ensusreRunningOnCorrectThread()
        self.running = true
        repeat {
            guard let next = self.findNext() else {
                break
            }
 
            if self.converter.compareVirtualTime(next.time, virtualTime).greaterThan {
                break
            }
 
            if self.converter.compareVirtualTime(next.time, self.clock).greaterThan {
                self.currentClock = next.time
            }
            next.invoke()
            self.schedulerQueue.remove(next)
        } while self.running
 
        self.currentClock = virtualTime
        self.running = false
    }
 
    /// Advances the scheduler's clock by the specified relative time.
    public func sleep(_ virtualInterval: VirtualTimeInterval) {
        ensusreRunningOnCorrectThread()
        let sleepTo = self.converter.offsetVirtualTime(self.clock, offset: virtualInterval)
        if self.converter.compareVirtualTime(sleepTo, self.clock).lessThen {
            fatalError("Can't sleep to past.")
        }
 
        self.currentClock = sleepTo
    }
 
    /// Stops the virtual time scheduler.
    public func stop() {
        ensusreRunningOnCorrectThread()
        self.running = false
    }
 
    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
 
    private func ensusreRunningOnCorrectThread() {
        guard Thread.current == thread else {
            rxFatalError("Executing on the wrong thread. Please ensure all work on the same thread.")
        }
    }
}
 
// MARK: description
 
extension VirtualTimeScheduler: CustomDebugStringConvertible {
    /// A textual representation of `self`, suitable for debugging.
    public var debugDescription: String {
        self.schedulerQueue.debugDescription
    }
}
 
final class VirtualSchedulerItem<Time>
    : Disposable {
    typealias Action = () -> Disposable
    
    let action: Action
    let time: Time
    let id: Int
 
    var isDisposed: Bool {
        self.disposable.isDisposed
    }
    
    var disposable = SingleAssignmentDisposable()
    
    init(action: @escaping Action, time: Time, id: Int) {
        self.action = action
        self.time = time
        self.id = id
    }
 
    func invoke() {
         self.disposable.setDisposable(self.action())
    }
    
    func dispose() {
        self.disposable.dispose()
    }
}
 
extension VirtualSchedulerItem
    : CustomDebugStringConvertible {
    var debugDescription: String {
        "\(self.time)"
    }
}