杨锴
2025-04-16 09a372bc45fde16fd42257ab6f78b8deeecf720b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//
//  Rx.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 2/14/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
#if TRACE_RESOURCES
    private let resourceCount = AtomicInt(0)
 
    /// Resource utilization information
    public struct Resources {
        /// Counts internal Rx resource allocations (Observables, Observers, Disposables, etc.). This provides a simple way to detect leaks during development.
        public static var total: Int32 {
            load(resourceCount)
        }
 
        /// Increments `Resources.total` resource count.
        ///
        /// - returns: New resource count
        public static func incrementTotal() -> Int32 {
            increment(resourceCount)
        }
 
        /// Decrements `Resources.total` resource count
        ///
        /// - returns: New resource count
        public static func decrementTotal() -> Int32 {
            decrement(resourceCount)
        }
    }
#endif
 
/// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    rxFatalError("Abstract method", file: file, line: line)
}
 
func rxFatalError(_ lastMessage: @autoclosure () -> String, file: StaticString = #file, line: UInt = #line) -> Swift.Never  {
    fatalError(lastMessage(), file: file, line: line)
}
 
func rxFatalErrorInDebug(_ lastMessage: @autoclosure () -> String, file: StaticString = #file, line: UInt = #line) {
    #if DEBUG
        fatalError(lastMessage(), file: file, line: line)
    #else
        print("\(file):\(line): \(lastMessage())")
    #endif
}
 
func incrementChecked(_ i: inout Int) throws -> Int {
    if i == Int.max {
        throw RxError.overflow
    }
    defer { i += 1 }
    return i
}
 
func decrementChecked(_ i: inout Int) throws -> Int {
    if i == Int.min {
        throw RxError.overflow
    }
    defer { i -= 1 }
    return i
}
 
#if DEBUG
    import Foundation
    final class SynchronizationTracker {
        private let lock = RecursiveLock()
 
        public enum SynchronizationErrorMessages: String {
            case variable = "Two different threads are trying to assign the same `Variable.value` unsynchronized.\n    This is undefined behavior because the end result (variable value) is nondeterministic and depends on the \n    operating system thread scheduler. This will cause random behavior of your program.\n"
            case `default` = "Two different unsynchronized threads are trying to send some event simultaneously.\n    This is undefined behavior because the ordering of the effects caused by these events is nondeterministic and depends on the \n    operating system thread scheduler. This will result in a random behavior of your program.\n"
        }
 
        private var threads = [UnsafeMutableRawPointer: Int]()
 
        private func synchronizationError(_ message: String) {
            #if FATAL_SYNCHRONIZATION
                rxFatalError(message)
            #else
                print(message)
            #endif
        }
        
        func register(synchronizationErrorMessage: SynchronizationErrorMessages) {
            self.lock.lock(); defer { self.lock.unlock() }
            let pointer = Unmanaged.passUnretained(Thread.current).toOpaque()
            let count = (self.threads[pointer] ?? 0) + 1
 
            if count > 1 {
                self.synchronizationError(
                    "⚠️ Reentrancy anomaly was detected.\n" +
                    "  > Debugging: To debug this issue you can set a breakpoint in \(#file):\(#line) and observe the call stack.\n" +
                    "  > Problem: This behavior is breaking the observable sequence grammar. `next (error | completed)?`\n" +
                    "    This behavior breaks the grammar because there is overlapping between sequence events.\n" +
                    "    Observable sequence is trying to send an event before sending of previous event has finished.\n" +
                    "  > Interpretation: This could mean that there is some kind of unexpected cyclic dependency in your code,\n" +
                    "    or that the system is not behaving in the expected way.\n" +
                    "  > Remedy: If this is the expected behavior this message can be suppressed by adding `.observe(on:MainScheduler.asyncInstance)`\n" +
                    "    or by enqueuing sequence events in some other way.\n"
                )
            }
            
            self.threads[pointer] = count
 
            if self.threads.count > 1 {
                self.synchronizationError(
                    "⚠️ Synchronization anomaly was detected.\n" +
                    "  > Debugging: To debug this issue you can set a breakpoint in \(#file):\(#line) and observe the call stack.\n" +
                    "  > Problem: This behavior is breaking the observable sequence grammar. `next (error | completed)?`\n" +
                    "    This behavior breaks the grammar because there is overlapping between sequence events.\n" +
                    "    Observable sequence is trying to send an event before sending of previous event has finished.\n" +
                    "  > Interpretation: " + synchronizationErrorMessage.rawValue +
                    "  > Remedy: If this is the expected behavior this message can be suppressed by adding `.observe(on:MainScheduler.asyncInstance)`\n" +
                    "    or by synchronizing sequence events in some other way.\n"
                )
            }
        }
 
        func unregister() {
            self.lock.performLocked { 
                let pointer = Unmanaged.passUnretained(Thread.current).toOpaque()
                self.threads[pointer] = (self.threads[pointer] ?? 1) - 1
                if self.threads[pointer] == 0 {
                    self.threads[pointer] = nil
                }
            }
        }
    }
 
#endif
 
/// RxSwift global hooks
public enum Hooks {
    
    // Should capture call stack
    public static var recordCallStackOnError: Bool = false
 
}