杨锴
2024-08-14 909e20941e45f8712c012db602034b47da0bfdb0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//
//  GroupBy.swift
//  RxSwift
//
//  Created by Tomi Koskinen on 01/12/15.
//  Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
 
extension ObservableType {
    /*
     Groups the elements of an observable sequence according to a specified key selector function.
 
     - seealso: [groupBy operator on reactivex.io](http://reactivex.io/documentation/operators/groupby.html)
 
     - parameter keySelector: A function to extract the key for each element.
     - returns: A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
     */
    public func groupBy<Key: Hashable>(keySelector: @escaping (Element) throws -> Key)
        -> Observable<GroupedObservable<Key, Element>> {
        GroupBy(source: self.asObservable(), selector: keySelector)
    }
}
 
final private class GroupedObservableImpl<Element>: Observable<Element> {
    private var subject: PublishSubject<Element>
    private var refCount: RefCountDisposable
    
    init(subject: PublishSubject<Element>, refCount: RefCountDisposable) {
        self.subject = subject
        self.refCount = refCount
    }
 
    override public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        let release = self.refCount.retain()
        let subscription = self.subject.subscribe(observer)
        return Disposables.create(release, subscription)
    }
}
 
 
final private class GroupBySink<Key: Hashable, Element, Observer: ObserverType>
    : Sink<Observer>
    , ObserverType where Observer.Element == GroupedObservable<Key, Element> {
    typealias ResultType = Observer.Element 
    typealias Parent = GroupBy<Key, Element>
 
    private let parent: Parent
    private let subscription = SingleAssignmentDisposable()
    private var refCountDisposable: RefCountDisposable!
    private var groupedSubjectTable: [Key: PublishSubject<Element>]
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        self.groupedSubjectTable = [Key: PublishSubject<Element>]()
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        self.refCountDisposable = RefCountDisposable(disposable: self.subscription)
        
        self.subscription.setDisposable(self.parent.source.subscribe(self))
        
        return self.refCountDisposable
    }
    
    private func onGroupEvent(key: Key, value: Element) {
        if let writer = self.groupedSubjectTable[key] {
            writer.on(.next(value))
        } else {
            let writer = PublishSubject<Element>()
            self.groupedSubjectTable[key] = writer
            
            let group = GroupedObservable(
                key: key,
                source: GroupedObservableImpl(subject: writer, refCount: refCountDisposable)
            )
            
            self.forwardOn(.next(group))
            writer.on(.next(value))
        }
    }
 
    final func on(_ event: Event<Element>) {
        switch event {
        case let .next(value):
            do {
                let groupKey = try self.parent.selector(value)
                self.onGroupEvent(key: groupKey, value: value)
            }
            catch let e {
                self.error(e)
                return
            }
        case let .error(e):
            self.error(e)
        case .completed:
            self.forwardOnGroups(event: .completed)
            self.forwardOn(.completed)
            self.subscription.dispose()
            self.dispose()
        }
    }
 
    final func error(_ error: Swift.Error) {
        self.forwardOnGroups(event: .error(error))
        self.forwardOn(.error(error))
        self.subscription.dispose()
        self.dispose()
    }
    
    final func forwardOnGroups(event: Event<Element>) {
        for writer in self.groupedSubjectTable.values {
            writer.on(event)
        }
    }
}
 
final private class GroupBy<Key: Hashable, Element>: Producer<GroupedObservable<Key,Element>> {
    typealias KeySelector = (Element) throws -> Key
 
    fileprivate let source: Observable<Element>
    fileprivate let selector: KeySelector
    
    init(source: Observable<Element>, selector: @escaping KeySelector) {
        self.source = source
        self.selector = selector
    }
 
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == GroupedObservable<Key,Element> {
        let sink = GroupBySink(parent: self, observer: observer, cancel: cancel)
        return (sink: sink, subscription: sink.run())
    }
}