//
|
// Observable+Concurrency.swift
|
// RxSwift
|
//
|
// Created by Shai Mishali on 22/09/2021.
|
// Copyright © 2021 Krunoslav Zaher. All rights reserved.
|
//
|
|
#if swift(>=5.6) && canImport(_Concurrency)
|
import Foundation
|
|
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
public extension ObservableConvertibleType {
|
/// Allows iterating over the values of an Observable
|
/// asynchronously via Swift's concurrency features (`async/await`)
|
///
|
/// A sample usage would look like so:
|
///
|
/// ```swift
|
/// do {
|
/// for try await value in observable.values {
|
/// // Handle emitted values
|
/// }
|
/// } catch {
|
/// // Handle error
|
/// }
|
/// ```
|
var values: AsyncThrowingStream<Element, Error> {
|
AsyncThrowingStream<Element, Error> { continuation in
|
var isFinished = false
|
let disposable = asObservable().subscribe(
|
onNext: { value in continuation.yield(value) },
|
onError: { error in
|
isFinished = true
|
continuation.finish(throwing: error)
|
},
|
onCompleted: {
|
isFinished = true
|
continuation.finish()
|
},
|
onDisposed: {
|
guard !isFinished else { return }
|
continuation.finish(throwing: CancellationError() )
|
}
|
)
|
continuation.onTermination = { @Sendable termination in
|
if case .cancelled = termination {
|
disposable.dispose()
|
}
|
}
|
}
|
}
|
}
|
|
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
public extension AsyncSequence {
|
/// Convert an `AsyncSequence` to an `Observable` emitting
|
/// values of the asynchronous sequence's type
|
///
|
/// - returns: An `Observable` of the async sequence's type
|
func asObservable() -> Observable<Element> {
|
Observable.create { observer in
|
let task = Task {
|
do {
|
for try await value in self {
|
observer.onNext(value)
|
}
|
|
observer.onCompleted()
|
} catch {
|
observer.onError(error)
|
}
|
}
|
|
return Disposables.create { task.cancel() }
|
}
|
}
|
}
|
#endif
|