杨锴
2025-03-11 90dc3329d1973fda691e357cf4523d5c7c67fa1d
Pods/Alamofire/Source/Core/DataStreamRequest.swift
@@ -25,13 +25,13 @@
import Foundation
/// `Request` subclass which streams HTTP response `Data` through a `Handler` closure.
public final class DataStreamRequest: Request {
public final class DataStreamRequest: Request, @unchecked Sendable {
    /// Closure type handling `DataStreamRequest.Stream` values.
    public typealias Handler<Success, Failure: Error> = (Stream<Success, Failure>) throws -> Void
    public typealias Handler<Success, Failure: Error> = @Sendable (Stream<Success, Failure>) throws -> Void
    /// Type encapsulating an `Event` as it flows through the stream, as well as a `CancellationToken` which can be used
    /// to stop the stream at any time.
    public struct Stream<Success, Failure: Error> {
    public struct Stream<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
        /// Latest `Event` from the stream.
        public let event: Event<Success, Failure>
        /// Token used to cancel the stream.
@@ -45,7 +45,7 @@
    /// Type representing an event flowing through the stream. Contains either the `Result` of processing streamed
    /// `Data` or the completion of the stream.
    public enum Event<Success, Failure: Error> {
    public enum Event<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
        /// Output produced every time the instance receives additional `Data`. The associated value contains the
        /// `Result` of processing the incoming `Data`.
        case stream(Result<Success, Failure>)
@@ -55,7 +55,7 @@
    }
    /// Value containing the state of a `DataStreamRequest` when the stream was completed.
    public struct Completion {
    public struct Completion: Sendable {
        /// Last `URLRequest` issued by the instance.
        public let request: URLRequest?
        /// Last `HTTPURLResponse` received by the instance.
@@ -67,7 +67,7 @@
    }
    /// Type used to cancel an ongoing stream.
    public struct CancellationToken {
    public struct CancellationToken: Sendable {
        weak var request: DataStreamRequest?
        init(_ request: DataStreamRequest) {
@@ -81,7 +81,7 @@
    }
    /// `URLRequestConvertible` value used to create `URLRequest`s for this instance.
    public let convertible: URLRequestConvertible
    public let convertible: any URLRequestConvertible
    /// Whether or not the instance will be cancelled if stream parsing encounters an error.
    public let automaticallyCancelOnStreamError: Bool
@@ -90,16 +90,16 @@
        /// `OutputStream` bound to the `InputStream` produced by `asInputStream`, if it has been called.
        var outputStream: OutputStream?
        /// Stream closures called as `Data` is received.
        var streams: [(_ data: Data) -> Void] = []
        var streams: [@Sendable (_ data: Data) -> Void] = []
        /// Number of currently executing streams. Used to ensure completions are only fired after all streams are
        /// enqueued.
        var numberOfExecutingStreams = 0
        /// Completion calls enqueued while streams are still executing.
        var enqueuedCompletionEvents: [() -> Void] = []
        var enqueuedCompletionEvents: [@Sendable () -> Void] = []
        /// Handler for any `HTTPURLResponse`s received.
        var httpResponseHandler: (queue: DispatchQueue,
                                  handler: (_ response: HTTPURLResponse,
                                            _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void)?
                                  handler: @Sendable (_ response: HTTPURLResponse,
                                                      _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void)?
    }
    let streamMutableState = Protected(StreamMutableState())
@@ -122,13 +122,13 @@
    ///   - delegate:                         `RequestDelegate` that provides an interface to actions not performed by
    ///                                       the `Request`.
    init(id: UUID = UUID(),
         convertible: URLRequestConvertible,
         convertible: any URLRequestConvertible,
         automaticallyCancelOnStreamError: Bool,
         underlyingQueue: DispatchQueue,
         serializationQueue: DispatchQueue,
         eventMonitor: EventMonitor?,
         interceptor: RequestInterceptor?,
         delegate: RequestDelegate) {
         eventMonitor: (any EventMonitor)?,
         interceptor: (any RequestInterceptor)?,
         delegate: any RequestDelegate) {
        self.convertible = convertible
        self.automaticallyCancelOnStreamError = automaticallyCancelOnStreamError
@@ -164,12 +164,11 @@
            }
            #endif
            state.numberOfExecutingStreams += state.streams.count
            let localState = state
            underlyingQueue.async { localState.streams.forEach { $0(data) } }
            underlyingQueue.async { [streams = state.streams] in streams.forEach { $0(data) } }
        }
    }
    func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
    func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping @Sendable (URLSession.ResponseDisposition) -> Void) {
        streamMutableState.read { dataMutableState in
            guard let httpResponseHandler = dataMutableState.httpResponseHandler else {
                underlyingQueue.async { completionHandler(.allow) }
@@ -200,7 +199,7 @@
    /// - Returns:              The `DataStreamRequest`.
    @discardableResult
    public func validate(_ validation: @escaping Validation) -> Self {
        let validator: () -> Void = { [unowned self] in
        let validator: @Sendable () -> Void = { [unowned self] in
            guard error == nil, let response else { return }
            let result = validation(request, response)
@@ -255,11 +254,12 @@
    ///
    /// - Returns:   The instance.
    @_disfavoredOverload
    @preconcurrency
    @discardableResult
    public func onHTTPResponse(
        on queue: DispatchQueue = .main,
        perform handler: @escaping (_ response: HTTPURLResponse,
                                    _ completionHandler: @escaping (ResponseDisposition) -> Void) -> Void
        perform handler: @escaping @Sendable (_ response: HTTPURLResponse,
                                              _ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void
    ) -> Self {
        streamMutableState.write { mutableState in
            mutableState.httpResponseHandler = (queue, handler)
@@ -275,9 +275,10 @@
    ///   - handler: Closure called when the instance produces an `HTTPURLResponse`.
    ///
    /// - Returns:   The instance.
    @preconcurrency
    @discardableResult
    public func onHTTPResponse(on queue: DispatchQueue = .main,
                               perform handler: @escaping (HTTPURLResponse) -> Void) -> Self {
                               perform handler: @escaping @Sendable (HTTPURLResponse) -> Void) -> Self {
        onHTTPResponse(on: queue) { response, completionHandler in
            handler(response)
            completionHandler(.allow)
@@ -296,7 +297,7 @@
    }
    func appendStreamCompletion<Success, Failure>(on queue: DispatchQueue,
                                                  stream: @escaping Handler<Success, Failure>) {
                                                  stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
        appendResponseSerializer {
            self.underlyingQueue.async {
                self.responseSerializerDidComplete {
@@ -317,7 +318,7 @@
    }
    func enqueueCompletion<Success, Failure>(on queue: DispatchQueue,
                                             stream: @escaping Handler<Success, Failure>) {
                                             stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
        queue.async {
            do {
                let completion = Completion(request: self.request,
@@ -340,9 +341,10 @@
    ///   - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
    ///
    /// - Returns:  The `DataStreamRequest`.
    @preconcurrency
    @discardableResult
    public func responseStream(on queue: DispatchQueue = .main, stream: @escaping Handler<Data, Never>) -> Self {
        let parser = { [unowned self] (data: Data) in
        let parser = { @Sendable [unowned self] (data: Data) in
            queue.async {
                self.capturingError {
                    try stream(.init(event: .stream(.success(data)), token: .init(self)))
@@ -366,11 +368,12 @@
    ///   - stream:     `StreamHandler` closure called as `Data` is received. May be called multiple times.
    ///
    /// - Returns:      The `DataStreamRequest`.
    @preconcurrency
    @discardableResult
    public func responseStream<Serializer: DataStreamSerializer>(using serializer: Serializer,
                                                                 on queue: DispatchQueue = .main,
                                                                 stream: @escaping Handler<Serializer.SerializedObject, AFError>) -> Self {
        let parser = { [unowned self] (data: Data) in
        let parser = { @Sendable [unowned self] (data: Data) in
            serializationQueue.async {
                // Start work on serialization queue.
                let result = Result { try serializer.serialize(data) }
@@ -407,10 +410,11 @@
    ///   - stream:     `StreamHandler` closure called as `Data` is received. May be called multiple times.
    ///
    /// - Returns:  The `DataStreamRequest`.
    @preconcurrency
    @discardableResult
    public func responseStreamString(on queue: DispatchQueue = .main,
                                     stream: @escaping Handler<String, Never>) -> Self {
        let parser = { [unowned self] (data: Data) in
        let parser = { @Sendable [unowned self] (data: Data) in
            serializationQueue.async {
                // Start work on serialization queue.
                let string = String(decoding: data, as: UTF8.self)
@@ -457,13 +461,15 @@
    ///   - stream:       `StreamHandler` closure called as `Data` is received. May be called multiple times.
    ///
    /// - Returns: The `DataStreamRequest`.
    @preconcurrency
    @discardableResult
    public func responseStreamDecodable<T: Decodable>(of type: T.Type = T.self,
                                                      on queue: DispatchQueue = .main,
                                                      using decoder: DataDecoder = JSONDecoder(),
                                                      preprocessor: DataPreprocessor = PassthroughPreprocessor(),
                                                      stream: @escaping Handler<T, AFError>) -> Self {
                                                      using decoder: any DataDecoder = JSONDecoder(),
                                                      preprocessor: any DataPreprocessor = PassthroughPreprocessor(),
                                                      stream: @escaping Handler<T, AFError>) -> Self where T: Sendable {
        responseStream(using: DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: preprocessor),
                       on: queue,
                       stream: stream)
    }
}
@@ -501,9 +507,9 @@
// MARK: - Serialization
/// A type which can serialize incoming `Data`.
public protocol DataStreamSerializer {
public protocol DataStreamSerializer: Sendable {
    /// Type produced from the serialized `Data`.
    associatedtype SerializedObject
    associatedtype SerializedObject: Sendable
    /// Serializes incoming `Data` into a `SerializedObject` value.
    ///
@@ -514,18 +520,18 @@
}
/// `DataStreamSerializer` which uses the provided `DataPreprocessor` and `DataDecoder` to serialize the incoming `Data`.
public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer {
public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer where T: Sendable {
    /// `DataDecoder` used to decode incoming `Data`.
    public let decoder: DataDecoder
    public let decoder: any DataDecoder
    /// `DataPreprocessor` incoming `Data` is passed through before being passed to the `DataDecoder`.
    public let dataPreprocessor: DataPreprocessor
    public let dataPreprocessor: any DataPreprocessor
    /// Creates an instance with the provided `DataDecoder` and `DataPreprocessor`.
    /// - Parameters:
    ///   - decoder: `        DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default.
    ///   - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
    ///                       `decoder`. `PassthroughPreprocessor()` by default.
    public init(decoder: DataDecoder = JSONDecoder(), dataPreprocessor: DataPreprocessor = PassthroughPreprocessor()) {
    public init(decoder: any DataDecoder = JSONDecoder(), dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) {
        self.decoder = decoder
        self.dataPreprocessor = dataPreprocessor
    }
@@ -567,8 +573,8 @@
    ///   - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
    ///                       `decoder`. `PassthroughPreprocessor()` by default.
    public static func decodable<T: Decodable>(of type: T.Type,
                                               decoder: DataDecoder = JSONDecoder(),
                                               dataPreprocessor: DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> {
                                               decoder: any DataDecoder = JSONDecoder(),
                                               dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> {
        DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: dataPreprocessor)
    }
}