0%

RxSwift 订阅原理

从一段很简单的代码开始

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1
let observable = Observable<Any>.create { observer -> Disposable in
observer.onNext("hello")
observer.onCompleted()
return Disposables.create()
}
// 2
observable.subscribe(onNext: { text in
print("recv \(text)")
}, onCompleted: {
print("completed")
})
// 3

发出一个事件有很多种方法,我们选择最简单的Observable.create方法,它接受一个回调函数,用这个回调函数创建出一个序列

我们需要思考以下问题

  • 序列的事件是什么时候发送的?(1 or 2 or 3)
  • 观察者的代码是什么时候执行的?(1 or 2 or 3)

Observable.create

1
2
3
4
5
6
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
// 看这里↓
return AnonymousObservable(subscribe)
}
}

create的代码很简单,他把功能转到AnonymousObservable中实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

let _subscribeHandler: SubscribeHandler // 看这里

init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler // 看这里
}

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}

AnonymousObservable存下了创建序列的回调,并在后续的run方法使用到了他

Observable.subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable

if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}

#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif

let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
// 看这里↓
let observer = AnonymousObserver<Element> { event in

#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif

switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}

总的来说,subscribe函数创建了一个AnonymousObserver,在他的事件处理函数里面对四个回调进行分发处理。并且后续调用了self.asObservable().subscribe(observer)
从这里能看得出来,示例代码都是方便我们使用的扩展,其本质是创建出AnonymousObservableAnonymousObserver,并且使用AnonymousObservable.subscribe(observer:)使他们链接,所以我们后续关注这三个点就好了

AnonymousObserver

观察者,他的作用就是对不同的事件进行处理,可以简单理解成按钮的点击事件。
因此他的实现也特别简单,我们从他的继承链出发,剖析他的作用:
AnonymousObserver -> ObserverBase -> Disposable & ObserverType

  1. Disposable & ObserverType
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public protocol Disposable {
    func dispose() // 看这里
    }

    public protocol ObserverType {
    associatedtype Element

    @available(*, deprecated, renamed: "Element")
    typealias E = Element

    func on(_ event: Event<Element>) // 看这里
    }
    Disposable就是RAII,用变量的生命周期绑定回调事件生效的生命周期;ObserverType定义了何为观察者,观察者必须实现其on函数,对不同的事件进行处理
  2. ObserverBase
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
        class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) { // 看这里
    switch event {
    case .next:
    if load(self._isStopped) == 0 {
    self.onCore(event)
    }
    case .error, .completed:
    if fetchOr(self._isStopped, 1) == 0 {
    self.onCore(event)
    }
    }
    }

    func onCore(_ event: Event<Element>) {
    rxAbstractMethod()
    }

    func dispose() {
    fetchOr(self._isStopped, 1)
    }
    }
    ```
    `ObserverBase`简单实现了两个协议,并定义了一个原子变量,标识事件之后可否继续,对事件上报到`onCore`函数处理
    3. `AnonymousObserver`
    ```swift
    final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void

    private let _eventHandler : EventHandler // 看这里

    init(_ eventHandler: @escaping EventHandler) {
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
    self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) { // 看这里
    return self._eventHandler(event)
    }

    #if TRACE_RESOURCES
    deinit {
    _ = Resources.decrementTotal()
    }
    #endif
    }
    AnonymousObserver保存用户传进来的eventHandler,并于onCore函数交予其处理

总结:AnonymousObserver保存用户的事件处理函数,并用Dispose管理生命周期

AnonymousObservable

序列,也称为被观察者,他的作用就是产生一系列的事件,并且可被观察者订阅
他的继承链如下:
AnonymousObservable -> Producer -> Observable -> ObservableType

  1. ObservableType
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public protocol ObservableType: ObservableConvertibleType {
    // 看这里
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
    }

    extension ObservableType {
    public func asObservable() -> Observable<Element> {
    return Observable.create { o in
    return self.subscribe(o)
    }
    }
    }
    ObservableType定义了何谓序列:可以被订阅的称之为序列。因此需要自行实现subscribe函数
  2. Observable
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class Observable<Element> : ObservableType {
    init() {
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
    }

    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    rxAbstractMethod()
    }

    public func asObservable() -> Observable<Element> {
    return self
    }

    deinit {
    #if TRACE_RESOURCES
    _ = Resources.decrementTotal()
    #endif
    }
    }
    Observable只实现了asObservable,返回为self,其他的并没有处理
  3. Producer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    class Producer<Element> : Observable<Element> {
    override init() {
    super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer) // 看这里
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

    return disposer
    }
    else {
    return CurrentThreadScheduler.instance.schedule(()) { _ in
    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer) // 看这里
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

    return disposer
    }
    }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    rxAbstractMethod()
    }
    }
    Producer实现了subscribe函数,他在里面判断了一下是否需要切换线程,然后将observer作为参数,委托到run函数处理。同时他自己声明了run函数,但保留实现
  4. AnonymousObservable
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
    self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 看这里***
    let subscription = sink.run(self) // 看这里***
    return (sink: sink, subscription: subscription)
    }
    }
    AnonymousObservable作为其中一种序列,他选择了使用回调的方式创建序列,因此他保存了用户传进来的subscribeHandler,同时他实现了run函数,在内部使用了AnonymousObservableSink对observer进行处理

总结:AnonymousObservable是一个序列,他内部拥有切换线程的逻辑,他实现了subscribe函数,不过具体实现交给了AnonymousObservableSink.run处理

AnonymousObservableSink

终于来到了AnonymousObservableSink,上面的类实际上都是回调的封装,还加上一点线程切换以及生命周期的管理,真正决定运行时机的subscribe原理,就隐藏在这个类中
先看看他的继承链:
AnonymousObservableSink -> ObserverType & Sink
Sink -> Disposable
先记住,下面我们直接看他们的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>

// state
private let _isStopped = AtomicInt(0)

#if DEBUG
private let _synchronizationTracker = SynchronizationTracker()
#endif

override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}

func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}

func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Sink<Observer: ObserverType> : Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
private let _disposed = AtomicInt(0)

#if DEBUG
private let _synchronizationTracker = SynchronizationTracker()
#endif

init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}

final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}

final func forwarder() -> SinkForward<Observer> {
return SinkForward(forward: self)
}

final var disposed: Bool {
return isFlagSet(self._disposed, 1)
}

func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}

deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}

整理一下,整个subscribe的关键就在于AnonymousObservable.run函数,在run函数中,他先创建了AnonymousObservableSink,并把observer传给了他

1
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)

AnonymousObservableSink是继承了Sink,实际上他把observer又传给了Sink
同时注意看!AnonymousObservableSink遵循了ObserverType协议,他实现了on方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}

on方法里面,他实际调用的是Sink.forwardOn方法

1
2
3
4
5
6
7
8
9
10
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}

Sink.forwardOn方法调用的是构造函数传进来的observer,这里接通了,也就是说现在AnonymousObservableSink已经传上了传进来的observer的衣服,调AnonymousObservableSink就等于调AnonymousObserver
最后看,创建完AnonymousObservableSink后一句!

1
let subscription = sink.run(self)

AnonymousObservableSink.run为:

1
2
3
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}

终于!在这里接通了!parent是AnonymousObservable,也就是序列,_subscribeHandler是序列里面用户传进来的创建序列的回调。AnyObserver(self)也就是说AnonymousObservableSink自己是被观察者,上面说到AnonymousObservableSink已经传上了AnonymousObserver的衣服,调他就等于调AnonymousObserver。因此在这里,一切都接通了!
所以回顾一下,本质上上面做的一切操作,到最后简化下来其实就是执行一次这样的代码:

1
2
3
4
(onNext: (Any)->Void, onCompleted: ()->Void, onError: ()->Void, onDisposed: ()->Void) -> Void {
onNext("hello")
onCompleted()
} ( {print("recv \($0)")}, {print("complete")}, {}, {} )

这代码执行的时机是 AnonymousObservableSink.run -> AnonymousObservable.run -> Producer.subscribe -> AnonymousObservable.subscribe -> observable.subscribe

总结

  • 序列的事件发送与观察者的代码是同时执行的
  • 他们执行的时机在subscribe上