//
|
// ObservableType+Extensions.swift
|
// RxSwift
|
//
|
// Created by Krunoslav Zaher on 2/21/15.
|
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
//
|
|
#if DEBUG
|
import Foundation
|
#endif
|
|
extension ObservableType {
|
/**
|
Subscribes an event handler to an observable sequence.
|
|
- parameter on: Action to invoke for each event in the observable sequence.
|
- returns: Subscription object used to unsubscribe from the observable sequence.
|
*/
|
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
|
let observer = AnonymousObserver { e in
|
on(e)
|
}
|
return self.asObservable().subscribe(observer)
|
}
|
|
/**
|
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
|
|
Also, take in an object and provide an unretained, safe to use (i.e. not implicitly unwrapped), reference to it along with the events emitted by the sequence.
|
|
- Note: If `object` can't be retained, none of the other closures will be invoked.
|
|
- parameter object: The object to provide an unretained reference on.
|
- parameter onNext: Action to invoke for each element in the observable sequence.
|
- parameter onError: Action to invoke upon errored termination of the observable sequence.
|
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
|
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
|
gracefully completed, errored, or if the generation is canceled by disposing subscription).
|
- returns: Subscription object used to unsubscribe from the observable sequence.
|
*/
|
public func subscribe<Object: AnyObject>(
|
with object: Object,
|
onNext: ((Object, Element) -> Void)? = nil,
|
onError: ((Object, Swift.Error) -> Void)? = nil,
|
onCompleted: ((Object) -> Void)? = nil,
|
onDisposed: ((Object) -> Void)? = nil
|
) -> Disposable {
|
subscribe(
|
onNext: { [weak object] in
|
guard let object = object else { return }
|
onNext?(object, $0)
|
},
|
onError: { [weak object] in
|
guard let object = object else { return }
|
onError?(object, $0)
|
},
|
onCompleted: { [weak object] in
|
guard let object = object else { return }
|
onCompleted?(object)
|
},
|
onDisposed: { [weak object] in
|
guard let object = object else { return }
|
onDisposed?(object)
|
}
|
)
|
}
|
|
/**
|
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
|
|
- parameter onNext: Action to invoke for each element in the observable sequence.
|
- parameter onError: Action to invoke upon errored termination of the observable sequence.
|
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
|
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
|
gracefully completed, errored, or if the generation is canceled by disposing subscription).
|
- returns: Subscription object used to unsubscribe from the observable sequence.
|
*/
|
public func subscribe(
|
onNext: ((Element) -> Void)? = nil,
|
onError: ((Swift.Error) -> Void)? = nil,
|
onCompleted: (() -> Void)? = nil,
|
onDisposed: (() -> Void)? = nil
|
) -> Disposable {
|
let disposable: Disposable
|
|
if let disposed = onDisposed {
|
disposable = Disposables.create(with: disposed)
|
}
|
else {
|
disposable = Disposables.create()
|
}
|
|
#if DEBUG
|
let synchronizationTracker = SynchronizationTracker()
|
#endif
|
|
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
|
|
let observer = AnonymousObserver<Element> { event in
|
|
#if DEBUG
|
synchronizationTracker.register(synchronizationErrorMessage: .default)
|
defer { synchronizationTracker.unregister() }
|
#endif
|
|
switch event {
|
case .next(let value):
|
onNext?(value)
|
case .error(let error):
|
if let onError = onError {
|
onError(error)
|
}
|
else {
|
Hooks.defaultErrorHandler(callStack, error)
|
}
|
disposable.dispose()
|
case .completed:
|
onCompleted?()
|
disposable.dispose()
|
}
|
}
|
return Disposables.create(
|
self.asObservable().subscribe(observer),
|
disposable
|
)
|
}
|
}
|
|
import Foundation
|
|
extension Hooks {
|
public typealias DefaultErrorHandler = (_ subscriptionCallStack: [String], _ error: Error) -> Void
|
public typealias CustomCaptureSubscriptionCallstack = () -> [String]
|
|
private static let lock = RecursiveLock()
|
private static var _defaultErrorHandler: DefaultErrorHandler = { subscriptionCallStack, error in
|
#if DEBUG
|
let serializedCallStack = subscriptionCallStack.joined(separator: "\n")
|
print("Unhandled error happened: \(error)")
|
if !serializedCallStack.isEmpty {
|
print("subscription called from:\n\(serializedCallStack)")
|
}
|
#endif
|
}
|
private static var _customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack = {
|
#if DEBUG
|
return Thread.callStackSymbols
|
#else
|
return []
|
#endif
|
}
|
|
/// Error handler called in case onError handler wasn't provided.
|
public static var defaultErrorHandler: DefaultErrorHandler {
|
get {
|
lock.performLocked { _defaultErrorHandler }
|
}
|
set {
|
lock.performLocked { _defaultErrorHandler = newValue }
|
}
|
}
|
|
/// Subscription callstack block to fetch custom callstack information.
|
public static var customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack {
|
get {
|
lock.performLocked { _customCaptureSubscriptionCallstack }
|
}
|
set {
|
lock.performLocked { _customCaptureSubscriptionCallstack = newValue }
|
}
|
}
|
}
|