杨锴
2025-03-11 90dc3329d1973fda691e357cf4523d5c7c67fa1d
Pods/RxSwift/RxSwift/Observables/Merge.swift
@@ -169,17 +169,7 @@
            self.parent.dispose()
        case .completed:
            self.parent.group.remove(for: self.disposeKey)
            if let next = self.parent.queue.dequeue() {
                self.parent.subscribe(next, group: self.parent.group)
            }
            else {
                self.parent.activeCount -= 1
                if self.parent.stopped && self.parent.activeCount == 0 {
                    self.parent.forwardOn(.completed)
                    self.parent.dispose()
                }
            }
            self.parent.dequeueNextAndSubscribe()
        }
    }
}
@@ -236,7 +226,8 @@
        return self.group
    }
    
    func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) {
    @discardableResult
    func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) -> Disposable {
        let subscription = SingleAssignmentDisposable()
        
        let key = group.insert(subscription)
@@ -247,6 +238,28 @@
            let disposable = innerSource.asObservable().subscribe(observer)
            subscription.setDisposable(disposable)
        }
        return subscription
    }
    func dequeueNextAndSubscribe() {
        if let next = queue.dequeue() {
            // subscribing immediately can produce values immediately which can re-enter and cause stack overflows
            let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in
                // lock again
                self.lock.performLocked {
                    self.subscribe(next, group: self.group)
                }
            }
            _ = group.insert(disposable)
        }
        else {
            activeCount -= 1
            if stopped && activeCount == 0 {
                forwardOn(.completed)
                dispose()
            }
        }
    }
    
    func performMap(_ element: SourceElement) throws -> SourceSequence {