| | |
| | | import Foundation |
| | | |
| | | /// `Request` subclass which streams HTTP response `Data` through a `Handler` closure. |
| | | public final class DataStreamRequest: Request { |
| | | public final class DataStreamRequest: Request, @unchecked Sendable { |
| | | /// Closure type handling `DataStreamRequest.Stream` values. |
| | | public typealias Handler<Success, Failure: Error> = (Stream<Success, Failure>) throws -> Void |
| | | public typealias Handler<Success, Failure: Error> = @Sendable (Stream<Success, Failure>) throws -> Void |
| | | |
| | | /// Type encapsulating an `Event` as it flows through the stream, as well as a `CancellationToken` which can be used |
| | | /// to stop the stream at any time. |
| | | public struct Stream<Success, Failure: Error> { |
| | | public struct Stream<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable { |
| | | /// Latest `Event` from the stream. |
| | | public let event: Event<Success, Failure> |
| | | /// Token used to cancel the stream. |
| | |
| | | |
| | | /// Type representing an event flowing through the stream. Contains either the `Result` of processing streamed |
| | | /// `Data` or the completion of the stream. |
| | | public enum Event<Success, Failure: Error> { |
| | | public enum Event<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable { |
| | | /// Output produced every time the instance receives additional `Data`. The associated value contains the |
| | | /// `Result` of processing the incoming `Data`. |
| | | case stream(Result<Success, Failure>) |
| | |
| | | } |
| | | |
| | | /// Value containing the state of a `DataStreamRequest` when the stream was completed. |
| | | public struct Completion { |
| | | public struct Completion: Sendable { |
| | | /// Last `URLRequest` issued by the instance. |
| | | public let request: URLRequest? |
| | | /// Last `HTTPURLResponse` received by the instance. |
| | |
| | | } |
| | | |
| | | /// Type used to cancel an ongoing stream. |
| | | public struct CancellationToken { |
| | | public struct CancellationToken: Sendable { |
| | | weak var request: DataStreamRequest? |
| | | |
| | | init(_ request: DataStreamRequest) { |
| | |
| | | } |
| | | |
| | | /// `URLRequestConvertible` value used to create `URLRequest`s for this instance. |
| | | public let convertible: URLRequestConvertible |
| | | public let convertible: any URLRequestConvertible |
| | | /// Whether or not the instance will be cancelled if stream parsing encounters an error. |
| | | public let automaticallyCancelOnStreamError: Bool |
| | | |
| | |
| | | /// `OutputStream` bound to the `InputStream` produced by `asInputStream`, if it has been called. |
| | | var outputStream: OutputStream? |
| | | /// Stream closures called as `Data` is received. |
| | | var streams: [(_ data: Data) -> Void] = [] |
| | | var streams: [@Sendable (_ data: Data) -> Void] = [] |
| | | /// Number of currently executing streams. Used to ensure completions are only fired after all streams are |
| | | /// enqueued. |
| | | var numberOfExecutingStreams = 0 |
| | | /// Completion calls enqueued while streams are still executing. |
| | | var enqueuedCompletionEvents: [() -> Void] = [] |
| | | var enqueuedCompletionEvents: [@Sendable () -> Void] = [] |
| | | /// Handler for any `HTTPURLResponse`s received. |
| | | var httpResponseHandler: (queue: DispatchQueue, |
| | | handler: (_ response: HTTPURLResponse, |
| | | _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void)? |
| | | handler: @Sendable (_ response: HTTPURLResponse, |
| | | _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void)? |
| | | } |
| | | |
| | | let streamMutableState = Protected(StreamMutableState()) |
| | |
| | | /// - delegate: `RequestDelegate` that provides an interface to actions not performed by |
| | | /// the `Request`. |
| | | init(id: UUID = UUID(), |
| | | convertible: URLRequestConvertible, |
| | | convertible: any URLRequestConvertible, |
| | | automaticallyCancelOnStreamError: Bool, |
| | | underlyingQueue: DispatchQueue, |
| | | serializationQueue: DispatchQueue, |
| | | eventMonitor: EventMonitor?, |
| | | interceptor: RequestInterceptor?, |
| | | delegate: RequestDelegate) { |
| | | eventMonitor: (any EventMonitor)?, |
| | | interceptor: (any RequestInterceptor)?, |
| | | delegate: any RequestDelegate) { |
| | | self.convertible = convertible |
| | | self.automaticallyCancelOnStreamError = automaticallyCancelOnStreamError |
| | | |
| | |
| | | } |
| | | #endif |
| | | state.numberOfExecutingStreams += state.streams.count |
| | | let localState = state |
| | | underlyingQueue.async { localState.streams.forEach { $0(data) } } |
| | | underlyingQueue.async { [streams = state.streams] in streams.forEach { $0(data) } } |
| | | } |
| | | } |
| | | |
| | | func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) { |
| | | func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping @Sendable (URLSession.ResponseDisposition) -> Void) { |
| | | streamMutableState.read { dataMutableState in |
| | | guard let httpResponseHandler = dataMutableState.httpResponseHandler else { |
| | | underlyingQueue.async { completionHandler(.allow) } |
| | |
| | | /// - Returns: The `DataStreamRequest`. |
| | | @discardableResult |
| | | public func validate(_ validation: @escaping Validation) -> Self { |
| | | let validator: () -> Void = { [unowned self] in |
| | | let validator: @Sendable () -> Void = { [unowned self] in |
| | | guard error == nil, let response else { return } |
| | | |
| | | let result = validation(request, response) |
| | |
| | | /// |
| | | /// - Returns: The instance. |
| | | @_disfavoredOverload |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func onHTTPResponse( |
| | | on queue: DispatchQueue = .main, |
| | | perform handler: @escaping (_ response: HTTPURLResponse, |
| | | _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void |
| | | perform handler: @escaping @Sendable (_ response: HTTPURLResponse, |
| | | _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void |
| | | ) -> Self { |
| | | streamMutableState.write { mutableState in |
| | | mutableState.httpResponseHandler = (queue, handler) |
| | |
| | | /// - handler: Closure called when the instance produces an `HTTPURLResponse`. |
| | | /// |
| | | /// - Returns: The instance. |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func onHTTPResponse(on queue: DispatchQueue = .main, |
| | | perform handler: @escaping (HTTPURLResponse) -> Void) -> Self { |
| | | perform handler: @escaping @Sendable (HTTPURLResponse) -> Void) -> Self { |
| | | onHTTPResponse(on: queue) { response, completionHandler in |
| | | handler(response) |
| | | completionHandler(.allow) |
| | |
| | | } |
| | | |
| | | func appendStreamCompletion<Success, Failure>(on queue: DispatchQueue, |
| | | stream: @escaping Handler<Success, Failure>) { |
| | | stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable { |
| | | appendResponseSerializer { |
| | | self.underlyingQueue.async { |
| | | self.responseSerializerDidComplete { |
| | |
| | | } |
| | | |
| | | func enqueueCompletion<Success, Failure>(on queue: DispatchQueue, |
| | | stream: @escaping Handler<Success, Failure>) { |
| | | stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable { |
| | | queue.async { |
| | | do { |
| | | let completion = Completion(request: self.request, |
| | |
| | | /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times. |
| | | /// |
| | | /// - Returns: The `DataStreamRequest`. |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func responseStream(on queue: DispatchQueue = .main, stream: @escaping Handler<Data, Never>) -> Self { |
| | | let parser = { [unowned self] (data: Data) in |
| | | let parser = { @Sendable [unowned self] (data: Data) in |
| | | queue.async { |
| | | self.capturingError { |
| | | try stream(.init(event: .stream(.success(data)), token: .init(self))) |
| | |
| | | /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times. |
| | | /// |
| | | /// - Returns: The `DataStreamRequest`. |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func responseStream<Serializer: DataStreamSerializer>(using serializer: Serializer, |
| | | on queue: DispatchQueue = .main, |
| | | stream: @escaping Handler<Serializer.SerializedObject, AFError>) -> Self { |
| | | let parser = { [unowned self] (data: Data) in |
| | | let parser = { @Sendable [unowned self] (data: Data) in |
| | | serializationQueue.async { |
| | | // Start work on serialization queue. |
| | | let result = Result { try serializer.serialize(data) } |
| | |
| | | /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times. |
| | | /// |
| | | /// - Returns: The `DataStreamRequest`. |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func responseStreamString(on queue: DispatchQueue = .main, |
| | | stream: @escaping Handler<String, Never>) -> Self { |
| | | let parser = { [unowned self] (data: Data) in |
| | | let parser = { @Sendable [unowned self] (data: Data) in |
| | | serializationQueue.async { |
| | | // Start work on serialization queue. |
| | | let string = String(decoding: data, as: UTF8.self) |
| | |
| | | /// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times. |
| | | /// |
| | | /// - Returns: The `DataStreamRequest`. |
| | | @preconcurrency |
| | | @discardableResult |
| | | public func responseStreamDecodable<T: Decodable>(of type: T.Type = T.self, |
| | | on queue: DispatchQueue = .main, |
| | | using decoder: DataDecoder = JSONDecoder(), |
| | | preprocessor: DataPreprocessor = PassthroughPreprocessor(), |
| | | stream: @escaping Handler<T, AFError>) -> Self { |
| | | using decoder: any DataDecoder = JSONDecoder(), |
| | | preprocessor: any DataPreprocessor = PassthroughPreprocessor(), |
| | | stream: @escaping Handler<T, AFError>) -> Self where T: Sendable { |
| | | responseStream(using: DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: preprocessor), |
| | | on: queue, |
| | | stream: stream) |
| | | } |
| | | } |
| | |
| | | // MARK: - Serialization |
| | | |
| | | /// A type which can serialize incoming `Data`. |
| | | public protocol DataStreamSerializer { |
| | | public protocol DataStreamSerializer: Sendable { |
| | | /// Type produced from the serialized `Data`. |
| | | associatedtype SerializedObject |
| | | associatedtype SerializedObject: Sendable |
| | | |
| | | /// Serializes incoming `Data` into a `SerializedObject` value. |
| | | /// |
| | |
| | | } |
| | | |
| | | /// `DataStreamSerializer` which uses the provided `DataPreprocessor` and `DataDecoder` to serialize the incoming `Data`. |
| | | public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer { |
| | | public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer where T: Sendable { |
| | | /// `DataDecoder` used to decode incoming `Data`. |
| | | public let decoder: DataDecoder |
| | | public let decoder: any DataDecoder |
| | | /// `DataPreprocessor` incoming `Data` is passed through before being passed to the `DataDecoder`. |
| | | public let dataPreprocessor: DataPreprocessor |
| | | public let dataPreprocessor: any DataPreprocessor |
| | | |
| | | /// Creates an instance with the provided `DataDecoder` and `DataPreprocessor`. |
| | | /// - Parameters: |
| | | /// - decoder: ` DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default. |
| | | /// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the |
| | | /// `decoder`. `PassthroughPreprocessor()` by default. |
| | | public init(decoder: DataDecoder = JSONDecoder(), dataPreprocessor: DataPreprocessor = PassthroughPreprocessor()) { |
| | | public init(decoder: any DataDecoder = JSONDecoder(), dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) { |
| | | self.decoder = decoder |
| | | self.dataPreprocessor = dataPreprocessor |
| | | } |
| | |
| | | /// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the |
| | | /// `decoder`. `PassthroughPreprocessor()` by default. |
| | | public static func decodable<T: Decodable>(of type: T.Type, |
| | | decoder: DataDecoder = JSONDecoder(), |
| | | dataPreprocessor: DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> { |
| | | decoder: any DataDecoder = JSONDecoder(), |
| | | dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> { |
| | | DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: dataPreprocessor) |
| | | } |
| | | } |