0
点赞
收藏
分享

微信扫一扫

RxJava 源码笔记(1)


  1. Notification: An object representing a notification sent to an {@link Observable}
  • 注释已经解释的很清楚,代表的是响应式编程信息流中的一条消息(onNext/onError/onComplete)
  • 正如上面所说的,Notification承载的是复数种类型,因此需要一个Kind对象来进行标记属于哪种信息。
  • Kind是一个枚举类: 有三个枚举值: OnNext, OnError, OnCompleted 和Observer的回调一一对应,这也是必然的。
  • 同样,因为onNext会携带某个对象,而onError则会携带某个Throwable, 因此Notification中也需要维护这样两个引用。
  • onCompleted因为本身不携带任何信息,因此一个单例不变对象就足够了:
  • private static final Notification ON_COMPLETED = new Notification(Kind.OnCompleted, null, null);
  • Notification的构造算是工厂模式,其构造被封装在类本身的static的createOnXXXX函数中.
  • 如果你想把某个Notification**投递给一个Observer,可以调用accept**(Observer<? super T> observer)
  • 综述其实Notification是一个很简单的消息类型集成类,封装了一些辅助方法。
  1. NotificationLite: 其表征的含义和Notification一样,不过更多的作为内部使用(比如在materialize/dematerialize 这样的operator的实现中).
  • 关键的一点: NotificationLite类本身扮演的不是一条lite的Notification(简化的信息即LiteNotification),扮演这个角色的是其内部的2个单例对象和一个内部类, NotificationLite本身像是liteNotification的一个Utils类
  • liteNotification和Notificaton的区别: Notificatiion包含了完整信息,对数据进行了封装,本身就是一个完整的信息个体, 而LiteNotification不对数据进行封装,没有包含Kind等辅助信息,其使用必须依托于NotificationLite这个Utils提供的辅助函数**。
  • 采用了单例模式,NotifictionLite本身只有一个实例,因为如同开始说的,NotificationLite本质是一个Utils
  • 真正的liteNotification通过下面几类对象来区分的:
  • ON_COMPLETED_SENTINEL 单例的 Serializable对象,代表 onCompleted
  • ON_NEXT_NULL_SENTINEL 单例的 Serializable对象, 其代表的是onNext(null),对于非null的情况,onNext传输的对象本身就代表了onNext
  • class OnErrorSentinel implements Serializable, 代表 onError, 因为需要封装Throwable, 因此不能单例。
  • NotificationLite的accept函数接受一个Observer和一个Object:
  • 和Notification的accept不同,因为Notification自身已经代表了一条消息,那么显然只需要一个Observer就足够,而NotificationLite从其定位上讲,显然这时候扮演的Utils是需要一个LiteNotification(这里是一个Object对象)的。
  • 如果LiteNotification 是 ON_COMPLETED_SENTINEL,只需要调用Observer的onCompleted即可
  • 如果LiteNotification 是 ON_NEXT_NULL_SENTINEL,调用onNext(null)
  • 如果传入的Object是OnErrorSentinel类型,提取出Error并调用onError(Error)
  • 否则该对象对应的就是一个非空的onNext, 直接类型转化并调用onNext, onNext((T) object)
  • 不接受object等于null,因为这是无意义的行为(你投递了一条没有信息的信息).
  • NotificationLite的next(T t)函数和Notification的createOnNext代表的意义一样,都是得到一个onNext Notification. 但是因为LiteNotification的特性,next()函数的逻辑很简单,如果t是null,那么返回单例ON_NEXT_NULL_SENTINEL,否则直接返回t(因为accpect函数的逻辑要求这样)
  • completed()/error(Throwable e)同样,不赘述。
  • 也实现了isNext/isError等方法,同样基于LiteNotification的特性。
  • kind方法基于LiteNotification返回其代表的实际的Notification的Kind.
  • 综合看,NotifiationLite在使用时避免了拆包和封包(这里的包指的是Notification)的开销, 在内部使用时,完全可以取代Notification的角色(因为内部使用一般确实用不到Notification的完整信息,只关注行为本身)。
  1. SubjectSubscriptionManager<T> extends AtomicReference<SubjectSubscriptionManager.State<T>> implements OnSubscribe<T>
  • 实现了状态机功能(主要是Subject的某些特性需要) 和 OnSubcribe(Subject作为Observable的需要)。
  • Object latest: 保存了上一个发出的信息的值或者结束时的值。
  • Action1<SubjectObserver<T>> onStart/onAdded/onTerminated, 三个回调操作,其接受的参数是一个Observer,一般来说就是一个Subscriber:
  • onStart: 有一个新的Subscriber要进行subscribe,在其被加到state前会被调用。
  • onAdded: 在Subscriber被加入到state之后会被调用。
  • onTerminated: 当Subscriber想要subscribe一个terminal state会被调用。
  • nl = NotificationLite.instance(): 前面介绍过,内部使用的LiteNotification的Utils。
  • State本身的功能很简单,内部维护了:
  • boolean terminated 代表着信息流是否结束。
  • SubjectObserver[] observers, 一个subscribe了Subject的observer的集合列表。
  • add/remove函数可以增加/删除observer,不过要注意的是,add/remove本身并不改变对象本身(如果确实改变了observers), 而是返回一个应用了改变的新的state对象(为什么会这么做,下面在解释SubjectSubscriptionManager的add/remove时会解释)
  • 同样内部已经预置几个对应特殊场景的State对象(享元模式):
  • TERMINATED = new State(true, NO_OBSERVERS) 代表一个已经Terminate的Subject, 没有observer
  • EMPTY = new State(false, NO_OBSERVERS) 代表一个还没有Terminate的Subject, 没有observer
  • SubjectSubscriptionManager 本身继承自AtomicReference 因此其本身就可以看待为一个State。
  • 作为一个OnSubscribe,实现了call(final Subscriber<? super T> child)函数,在Subscriber subscribe时会被调用:
  • 首先将Subscriber(child)进一步装饰为一个SubjectObserver(bo)
  • 然后调用addUnsubscriber(child, bo)为Subscriber**添加一个Subscriber进行unSubscribe时的回调**, 该回调的操作是将bo从State的Observers中移除。
  • 回调onStart(bo)
  • 如果child没有被UnSubcribe, 那么调用add(bo)将bo加入到State的Observers中。
  • 为了对外提供访问/修改observers的方式, 有以下函数:
  • add(SubjectObserver<T> o):
  • 首先获取当前的state, 视为oldState
  • 如果发现oldState已经Terminate, 那么没有必要再将Observer进行add,直接回调onTerminated.
  • 否则调用state的add,注意State的特性,这里state对象本身不会改变,而是会返回一个修改过的新的State对象
  • 调用AtomicReference的CAS方法实现原子不加锁来更新为最新的State对象,并回调onAdd.
  • 上面的流程被包在一个do while(true)死循环中,能跳出的两个契机是state本身Terminate或者CAS新的State对象成功。
  • add/remove的实现解释了State的add/remove的特殊性,这里为了兼顾性能和多线程,采用的CAS这种不加锁的方式来实现了多线程安全,同样保证了性能
  • remove(SubjectObserver<T> o):
  • 基本同上。
  • observers(): 返回observers数组。
  • SubjectObserver<T> implements Observer<:
  • 应用了装饰模式, 对构造时传入的Subscriber进行了装饰,不过onNext/onComplete/onError则是直接转发给被装饰的Subscriber。
  • 装饰的新功能体现在对NotificatinLite的支持上.:
  • emitNext(Object n, final NotificationLite<T> nl):
  • 将n(一个LiteNotification, 因此需要NotificationLite)作为一个消息发出去。
  • 内部调用synchronized (this)会和下面两个函数竞争,从而阻塞其他函数。
  • 如果现在正在emit(发送信息),会将该消息加入到queue中.
  • emitFirst(Object n, final NotificationLite<T> nl);
  • 首先会判断是否真的是第一次emit(内部一个flag来保证),如果不是第一次,直接返回。
  • 如果正在emiting,那么也返回。
  • 调用emitLoop(null, n, nl)将n发送出去。
  • emitLoop(List<Object> localQueue, Object current, final NotificationLite<T> nl)
  • 先把localQueue中所有的信息发出去。
  • 然后再发送current。
  • next(Object n):
  • 设置一个LiteNotification为Latest,并且返回当前active的observers
  • terminate(Object n):
  • 设置一个LiteNotification为Latest
  • 设置active为false代表terminate
  • 获取当前的set,如果set已经terminate,那么返回State.NO_OBSERVERS代表没有一个observer
  • 否则设置当前的set为State.TERMINATED,并返回其observers。
  • 综合来看,SubjectSubscriptionManager实现了对多个observer的通知和管理,并且是多线程安全的,其作为OnSubscribe实现的功能主要是将subcriber加入到自己的observers中,并通过设置Subscriber,使其在unsubcribe时能被从observers中remove,同时还有onStart等回调来提供一套整体observers变化时的回调机制,这也贴切了其命名: 一个Subject的众多Subscription的manager
  1. BehaviorSubject:
  • 该Subject有这样一个特性: 当有Subscriber要subscribe它时,如果该Subject之前已经发出了信息或者结束,那么该Subscriber会收到最近一次发送的信息(包括结束), 例子:
    // observer will receive the “one”, “two” and “three” events, but not “zero”
    BehaviorSubject subject = BehaviorSubject.create(“default”);
    subject.onNext(“zero”);
    subject.onNext(“one”);
    subject.subscribe(observer);
    subject.onNext(“two”);
    subject.onNext(“three”);
    // observer will receive only onCompleted
    BehaviorSubject subject = BehaviorSubject.create(“default”);
    subject.onNext(“zero”);
    subject.onNext(“one”);
    subject.onCompleted();
    subject.subscribe(observer);
  • 从上面的例子可以看出BehaviorSubject的特性,同时还可以看到,BehaviorSubject是的,和通常的Observable不一样,BehaviorSubject不是在被Subscriber subscribe时才开始自己的信息流,完全可以通过调用其onXXXX方法在没有Subscriber的情况下开始自己的信息流
  • BehaviorSubject可以有多个Observer, 管理Observer以及向Observer投递信息的职责全部委托给了SubjectSubscriptionManager
  • BehaviorSubject在构造的时候就会创建一个SubjectSubscriptionManager, 同时BehaviorSubject的构造函数还接受一个defaultValue, 该defaultValue在Subject的信息流还没有真正开始时,如果Observer来Subscribe,会被作为信息传递给Observer
  • SubjectSubscriptionManager的onAdded和onTerminated被根据BehaviorSubject的特性来配置:
  • 传输Latest的信息给Observer. 通过调用SubjectObserver的emitFirst.
  • 这样就实现了BehaviorSubject的特性,在Observer subscribe时,会收到BehaviorSubject的最近一条信息。
  • 因为BehaviorSubject同时又是一个Observer,因此其也需要实现onNext/Error/Completeted等函数:
  • 因为Subject本身是一个中继,因此其onXXX方法本质都是遍历SubjectSubscriptionManager的observers并调用其emitNext来将信息传递给observers. 为了归一化onXXXX使得可以统一调用emitNext来处理,这里使用了NotificationLite来进行归一化
  • 作为一个的Observable, BehaviorSubject开始发送信息不再依赖于Subscriber的subscribe来触发,完全可以直接调用其onXXX方法来开始Subject的信息流,不必在意是否有Subscriber,信息流开始和发送完全自主。
  • 对于onError/Completed来说,除了调用emitNext将Notification散布给observers外,还需要调用SubjectSubscriptionManager的terminate(Object n(代表一条liteNotification))函数将SubjectSubscriptionManager的状态设置为active=false代表着信息流的终结, 并且释放observers.


举报

相关推荐

0 条评论