| | |
| | | self.parent.dispose() |
| | | case .completed: |
| | | self.parent.group.remove(for: self.disposeKey) |
| | | if let next = self.parent.queue.dequeue() { |
| | | self.parent.subscribe(next, group: self.parent.group) |
| | | } |
| | | else { |
| | | self.parent.activeCount -= 1 |
| | | |
| | | if self.parent.stopped && self.parent.activeCount == 0 { |
| | | self.parent.forwardOn(.completed) |
| | | self.parent.dispose() |
| | | } |
| | | } |
| | | self.parent.dequeueNextAndSubscribe() |
| | | } |
| | | } |
| | | } |
| | |
| | | return self.group |
| | | } |
| | | |
| | | func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) { |
| | | @discardableResult |
| | | func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) -> Disposable { |
| | | let subscription = SingleAssignmentDisposable() |
| | | |
| | | let key = group.insert(subscription) |
| | |
| | | let disposable = innerSource.asObservable().subscribe(observer) |
| | | subscription.setDisposable(disposable) |
| | | } |
| | | return subscription |
| | | } |
| | | |
| | | func dequeueNextAndSubscribe() { |
| | | if let next = queue.dequeue() { |
| | | // subscribing immediately can produce values immediately which can re-enter and cause stack overflows |
| | | let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in |
| | | // lock again |
| | | self.lock.performLocked { |
| | | self.subscribe(next, group: self.group) |
| | | } |
| | | } |
| | | _ = group.insert(disposable) |
| | | } |
| | | else { |
| | | activeCount -= 1 |
| | | |
| | | if stopped && activeCount == 0 { |
| | | forwardOn(.completed) |
| | | dispose() |
| | | } |
| | | } |
| | | } |
| | | |
| | | func performMap(_ element: SourceElement) throws -> SourceSequence { |