RxSwift大家都已经很熟悉了,才会想了解核心逻辑的,基础的介绍就不在此赘述了,使用起来也非常方便,得益于RxSwift设计者设计的精简API,使用的步骤如下:
- 1.创建序列
- 2.订阅序列
- 3.发送信号
我们对RxSwift的使用已经非常熟练,对RxSwift的执行流程也有一定的了解,但可能对于代码层面的实现逻辑不是那么清晰,因为这需要去啃枯燥的源码才能获得,出于种种原因,可能并没有去啃,这里把我的心得分享一下。
最基础的使用代码
最开始学习RxSwift的时候,我们都运行过如下一段代码:
// 1.创建Int序列
_ = Observable<Int>.create({ (observer) -> Disposable in
// 3.发送信号
observer.onNext(666)
return Disposables.create()
}).subscribe(onNext: { (num) in // 2.订阅1创建的序列
print("订阅到的数字:\(num)")
})
// 打印结果:订阅到的数字:666
我们都知道,控制台肯定会打印订阅到的数字:666,但问题也随之而来,为什么在创建序列的闭包中发送的信号会在订阅的闭包中接收到,于是就该进入源码了。
源码分析
创建序列部分
Observable —— 可观察序列,这个我们都很熟悉了。
首先看下Observable的定义:
public class Observable<Element> : RxSwift.ObservableType {
/// Type of elements in sequence.
public typealias E = Element
public func subscribe<O>(_ observer: O) -> RxSwift.Disposable where Element == O.E, O : RxSwift.ObserverType
public func asObservable() -> RxSwift.Observable<RxSwift.Observable<Element>.E>
}
看Observable的定义,我们看到了熟悉的两个方法:
再看往上溯源,其实这两个方法是来自于实现的协议,并非类本身的能力。
订阅是ObservableType的能力;
public protocol ObservableType : ObservableConvertibleType {
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
转为可观察序列是ObservableConvertibleType的能力;
public protocol ObservableConvertibleType {
associatedtype E
func asObservable() -> Observable<E>
}
可,并没有看见创建序列的方法,Observable是怎么创建出来的呢?
既然本类里面没有,实现的协议中也没有,那就只可能是通过协议扩展增加的方法,于是我们检索extension ObservableType,发现了很多的扩展,暂时先忽略,把创建的扩展拎出来。
extension ObservableType {
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
}
当找到方法的定义之后,问题又来了,这只是定义,实现在哪里呢,jump根本不管用,于是继续检索,然后找到了具体的实现,原来是提供了默认实现的,在Create.swift类中,有如下实现:
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
看这个创建方法的默认实现,我们get了如下两个点:
- 通过
crate方法返回了一个AnonymousObservable -
AnonymousObservable保存了发送信号的闭包
我们先来看看AnonymousObservable是个啥,顾名思义,这肯定也是一个可观察序列,Anonymous是匿名的意思,那也就是说这是个匿名可观察序列,不暴露给外部使用的,在内部流转使用的。
我们看一下AnonymousObservable定义:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
看到这,我们get到了,创建可观察序列的发送信号的闭包是保存在了AnonymousObservable中。
到这,关于创建可观察序列的流程,就结束了。
但关于可观察序列的源码解读还没有结束,我们这里先看一下AnonymousObservable的继承链和协议实现链,后面看订阅的流转会清晰的多。

AnonymousObservable类继承自Producer类,Producer继承自Observable类,Observable类实现了ObservableType协议,ObservableType协议继承了ObservableConvertibleType协议。
ObservableType协议的扩展对继承自ObservableConvertibleType协议的asObservable()方法有默认实现。
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<E> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
Observable类实现了协议链中的两个方法,但subscribe方法是不允许直接直接使用的,交给子类Producer重写:
public class Observable<Element> : ObservableType {
// 省略。。。
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
// 直接使用抛出异常
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
// 省略。。。
}
Producer除了重写subscribe方法,再定义了一个run方法给子类AnonymousObservable重写,run方法也是不允许直接使用的:
class Producer<Element> : Observable<Element> {
// 重写父类的 subscribe
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
// 定义给子类使用的方法,直接使用抛异常
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
AnonymousObservable中,除了定义了保存创建闭包的构造函数,就是重写父类的run方法:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
然后,就该到订阅的部分了,从订阅中找寻创建序列的发送信号的闭包是怎么流转的,是怎样才能订阅到发送的信号的。
订阅序列部分
我们通过示例代码中的subscribe跳转过去看到的是:
extension ObservableType {
// 省略...
public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable
}
跟创建Observable的create方法一样的套路,对协议进行扩展,然后提供默认实现:
extension ObservableType {
// 省略...
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
我们看这个方法的实现,get到如下几个点:
- 创建了
AnonymousObserver实例,看着很眼熟,跟AnonymousObservable类似,创建了一个内部观察者,并传递了一个事件处理闭包,后面就能知道,信号发送就是通过这个闭包来回调的;
- 创建了
- 此处的
self其实就是AnonymousObservable;
- 此处的
-
self.asObservable()是基类协议ObservableConvertibleType的能力,而ObservableType是继承自ObservableConvertibleType的,上文中有说到该方法的默认实现,这个方法就是把ObservableType转成Observable——可观察序列;
-
-
self.asObservable().subscribe(observer),也是调用AnonymousObservable中的subscribe方法,如果记得上文中AnonymousObservable的继承和协议链的话(不记得话,回上文),就很清晰,其实这里调用的是Produce中的subscribe方法
-
那我们就到Produce中的subscribe方法来看具体操作了什么:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// 忽略...
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
在else分支中,调用了self.run,Producer只是定义,真正的实现是在AnonymousObservable中的重写:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
从run方法的实现中,我们可以get到如下几个点:
- 创建了一个
AnonymousObservableSink类型的sink管道,构造函数传入的参数是AnonymousObserver的实例observe
- 创建了一个
-
sink管道调用了自身的run方法,把self(也就是AnonymousObservable)当做参数传入
-
然后我们看一下AnonymousObservableSink类中run方法的实现:
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
这里是调用了AnonymousObservable中的_subscribeHandler闭包,这也就解释了为什么我们在订阅了序列之后会执行发送信号的闭包。
同时new了一个新类AnyObserver,也就是说将AnonymousObservableSink实例当做参数构造了一个AnyObserver实例,先来看下AnyObserver的定义:
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
注意看第二个构造函数,接收的是ObserverType类型的参数,将observer.on保存到自己的属性observer: EventHandler,也就是说保存到observer属性的不是AnonymousObservableSink实例,而是AnonymousObservableSink实例中的on方法。
看到这,其实也就明了了一件事,observer.onNext(666),其实就是AnyObserver.onNext(666)。
上面我们贴出了AnyObserver的类定义,里面是没有onNext方法的,那自然想到去找父类、协议或协议的扩展,然后看ObserverType协议的定义和扩展:
public protocol ObserverType {
associatedtype E
func on(_ event: Event<E>)
}
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
看着这些方法,是不是有种特别的熟悉的感觉,onNext、onCompleted和onError,我们在订阅时常用的方法,看方法的实现,也就意味着,observer.onNext(666)就变成了AnyObserver.on(.next(666)),然后我们回到AnyObserver中的on方法:
public func on(_ event: Event<Element>) {
return self.observer(event)
}
上文中说过,AnyObserver中self.observer其实就是AnonymousObservableSink实例中的on方法,那observer.onNext(666)就变成了AnonymousObservableSink.on(.next(666))。
于是,于是,我们又回到了AnonymousObservableSink中,到这,不得不感慨RxSwift作者神来一笔的设计,大神就是大神。。。
然后来看一眼on方法的实现:
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
内部调用了forwardOn方法,AnonymousObservableSink本类是没有这个方法的,于是我们找父类或协议,在父类Sink中找到了该方法:
final func forwardOn(_ event: Event<O.E>) {
self._observer.on(event)
}
self._observer是什么,上文中是不是说过创建AnonymousObservableSink时传入的参数就是我们订阅时创建的AnonymousObserver实例,所以observer.onNext(666)就变成了AnonymousObserver.on(.next(666)),但AnonymousObserver中是没有on方法的,但我们在父类ObserverBase中找到了方法的实现:
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
最终执行self.onCore(event),其实还是回到了AnonymousObserver中:
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
而self._eventHandler又是什么呢,是构造函数传入的闭包,于是就回到了订阅时的创建AnonymousObserver时的闭包了:
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
当self._eventHandler调用时,就会回调到上述闭包中,然后判断不同的事件类型,通过我们在订阅时创建的闭包根据不同的闭包回调出去,onNext、onError或onCompleted,这也就解释了我们在订阅后能接收到回调的原因。
总结
整个核心逻辑,看起来很复杂、很混乱,总结下来,如下几点:
- 1.创建序列时,创建一个闭包
A,同时创建了AnonymousObservable实例,将闭包A存在实例中 - 2.订阅序列时,创建一个闭包
B,同时创建了AnonymousObserve实例,将闭包B存在实例中 - 3.创建一个
AnonymousObservableSink实例,AnonymousObserve实例为AnonymousObservableSink构造函数的参数,AnonymousObserve实例存储在AnonymousObservableSink的父类Sink的_observer属性中 - 4.调用
AnonymousObservableSink实例的run方法,执行闭包A,同时创建一个AnyObserver实例 - 5.
AnyObserver实例保存了Sink中的on方法 - 6.闭包
A中发送信号,其实调用的就是存在AnonymousObservableSink中的AnonymousObserve实例父类ObserverBase中的on方法,最后又回到AnonymousObserve实例的onCore方法 - 7.
onCore方法的执行,实际上就是在执行闭包B,而闭包B执行就会根据事件类型回调我们的订阅闭包
好了,整个核心逻辑就是这样了,再次膜拜大神的设计~









