杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//
//  TailRecursiveSink.swift
//  RxSwift
//
//  Created by Krunoslav Zaher on 3/21/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
enum TailRecursiveSinkCommand {
    case moveNext
    case dispose
}
 
#if DEBUG || TRACE_RESOURCES
    public var maxTailRecursiveSinkStackSize = 0
#endif
 
/// This class is usually used with `Generator` version of the operators.
class TailRecursiveSink<Sequence: Swift.Sequence, Observer: ObserverType>
    : Sink<Observer>
    , InvocableWithValueType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
    typealias Value = TailRecursiveSinkCommand
    typealias Element = Observer.Element 
    typealias SequenceGenerator = (generator: Sequence.Iterator, remaining: IntMax?)
 
    var generators: [SequenceGenerator] = []
    var disposed = false
    var subscription = SerialDisposable()
 
    // this is thread safe object
    var gate = AsyncLock<InvocableScheduledItem<TailRecursiveSink<Sequence, Observer>>>()
 
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
 
    func run(_ sources: SequenceGenerator) -> Disposable {
        self.generators.append(sources)
 
        self.schedule(.moveNext)
 
        return self.subscription
    }
 
    func invoke(_ command: TailRecursiveSinkCommand) {
        switch command {
        case .dispose:
            self.disposeCommand()
        case .moveNext:
            self.moveNextCommand()
        }
    }
 
    // simple implementation for now
    func schedule(_ command: TailRecursiveSinkCommand) {
        self.gate.invoke(InvocableScheduledItem(invocable: self, state: command))
    }
 
    func done() {
        self.forwardOn(.completed)
        self.dispose()
    }
 
    func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        rxAbstractMethod()
    }
 
    // should be done on gate locked
 
    private func moveNextCommand() {
        var next: Observable<Element>?
 
        repeat {
            guard let (g, left) = self.generators.last else {
                break
            }
            
            if self.isDisposed {
                return
            }
 
            self.generators.removeLast()
            
            var e = g
 
            guard let nextCandidate = e.next()?.asObservable() else {
                continue
            }
 
            // `left` is a hint of how many elements are left in generator.
            // In case this is the last element, then there is no need to push
            // that generator on stack.
            //
            // This is an optimization used to make sure in tail recursive case
            // there is no memory leak in case this operator is used to generate non terminating
            // sequence.
 
            if let knownOriginalLeft = left {
                // `- 1` because generator.next() has just been called
                if knownOriginalLeft - 1 >= 1 {
                    self.generators.append((e, knownOriginalLeft - 1))
                }
            }
            else {
                self.generators.append((e, nil))
            }
 
            let nextGenerator = self.extract(nextCandidate)
 
            if let nextGenerator = nextGenerator {
                self.generators.append(nextGenerator)
                #if DEBUG || TRACE_RESOURCES
                    if maxTailRecursiveSinkStackSize < self.generators.count {
                        maxTailRecursiveSinkStackSize = self.generators.count
                    }
                #endif
            }
            else {
                next = nextCandidate
            }
        } while next == nil
 
        guard let existingNext = next else {
            self.done()
            return
        }
 
        let disposable = SingleAssignmentDisposable()
        self.subscription.disposable = disposable
        disposable.setDisposable(self.subscribeToNext(existingNext))
    }
 
    func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        rxAbstractMethod()
    }
 
    func disposeCommand() {
        self.disposed = true
        self.generators.removeAll(keepingCapacity: false)
    }
 
    override func dispose() {
        super.dispose()
        
        self.subscription.dispose()
        self.gate.dispose()
        
        self.schedule(.dispose)
    }
}