//
|
// Producer.swift
|
// RxSwift
|
//
|
// Created by Krunoslav Zaher on 2/20/15.
|
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
//
|
|
class Producer<Element> : Observable<Element> {
|
override init() {
|
super.init()
|
}
|
|
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
|
if !CurrentThreadScheduler.isScheduleRequired {
|
// The returned disposable needs to release all references once it was disposed.
|
let disposer = SinkDisposer()
|
let sinkAndSubscription = self.run(observer, cancel: disposer)
|
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
|
|
return disposer
|
}
|
else {
|
return CurrentThreadScheduler.instance.schedule(()) { _ in
|
let disposer = SinkDisposer()
|
let sinkAndSubscription = self.run(observer, cancel: disposer)
|
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
|
|
return disposer
|
}
|
}
|
}
|
|
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
|
rxAbstractMethod()
|
}
|
}
|
|
private final class SinkDisposer: Cancelable {
|
private enum DisposeState: Int32 {
|
case disposed = 1
|
case sinkAndSubscriptionSet = 2
|
}
|
|
private let _state = AtomicInt(0)
|
private var _sink: Disposable?
|
private var _subscription: Disposable?
|
|
var isDisposed: Bool {
|
return isFlagSet(self._state, DisposeState.disposed.rawValue)
|
}
|
|
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
|
self._sink = sink
|
self._subscription = subscription
|
|
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
|
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
|
rxFatalError("Sink and subscription were already set")
|
}
|
|
if (previousState & DisposeState.disposed.rawValue) != 0 {
|
sink.dispose()
|
subscription.dispose()
|
self._sink = nil
|
self._subscription = nil
|
}
|
}
|
|
func dispose() {
|
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
|
|
if (previousState & DisposeState.disposed.rawValue) != 0 {
|
return
|
}
|
|
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
|
guard let sink = self._sink else {
|
rxFatalError("Sink not set")
|
}
|
guard let subscription = self._subscription else {
|
rxFatalError("Subscription not set")
|
}
|
|
sink.dispose()
|
subscription.dispose()
|
|
self._sink = nil
|
self._subscription = nil
|
}
|
}
|
}
|