//
|
// Create.swift
|
// RxSwift
|
//
|
// Created by Krunoslav Zaher on 2/8/15.
|
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
//
|
|
extension ObservableType {
|
// MARK: create
|
|
/**
|
Creates an observable sequence from a specified subscribe method implementation.
|
|
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
|
|
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
|
- returns: The observable sequence with the specified implementation for the `subscribe` method.
|
*/
|
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
|
AnonymousObservable(subscribe)
|
}
|
}
|
|
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
|
typealias Element = Observer.Element
|
typealias Parent = AnonymousObservable<Element>
|
|
// state
|
private let isStopped = AtomicInt(0)
|
|
#if DEBUG
|
private let synchronizationTracker = SynchronizationTracker()
|
#endif
|
|
override init(observer: Observer, cancel: Cancelable) {
|
super.init(observer: observer, cancel: cancel)
|
}
|
|
func on(_ event: Event<Element>) {
|
#if DEBUG
|
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
|
defer { self.synchronizationTracker.unregister() }
|
#endif
|
switch event {
|
case .next:
|
if load(self.isStopped) == 1 {
|
return
|
}
|
self.forwardOn(event)
|
case .error, .completed:
|
if fetchOr(self.isStopped, 1) == 0 {
|
self.forwardOn(event)
|
self.dispose()
|
}
|
}
|
}
|
|
func run(_ parent: Parent) -> Disposable {
|
parent.subscribeHandler(AnyObserver(self))
|
}
|
}
|
|
final private class AnonymousObservable<Element>: Producer<Element> {
|
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
|
|
let subscribeHandler: SubscribeHandler
|
|
init(_ subscribeHandler: @escaping SubscribeHandler) {
|
self.subscribeHandler = subscribeHandler
|
}
|
|
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
|
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
|
let subscription = sink.run(self)
|
return (sink: sink, subscription: subscription)
|
}
|
}
|