Rxjava可以非常方便的完成线程的切换,链式调用这种艺术般的设计深受开发者的喜爱。本节通过源码来深入了解一下这其中的原理。
网上有很多介绍的文章,但大部分都有些晦涩难懂。本文旨在由浅入深,一步步深入“虎穴”。
最基本调用
先看一下最简单的调用方式:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
}
@Override
public void onNext(String s) {
System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
System.out.println("接收数据:" + s);
}
@Override
public void onComplete() {
System.out.println("onComplete---" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
可以说没有添加任何逻辑,只有发送与接收。这里把每一步的所在线程,发送和接收的数据,打印了出来。
运行结果:
onSubscribe---main
subscribe---main
发送数据:hello
onNext---main
接收数据:hello
onComplete---main
注意一下subscribe方法,代码里出现了两个。
第一个是我们发送数据用的,属于ObservableOnSubscribe接口;
第二个是提交整个链式调用的,属于ObservableSource接口。
不要搞混了,打印出来的是第一个。
然后注意一下打印出来的顺序,onSubscribe是先于subscribe的,稍后我们从源码中查看原因。
整个代码片段用到了两个Rxjava的方法,create创建最初的数据源;ObservableSource的subscribe创建数据接收者。
下面我们先看看create的实现:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
总共做了3件事:
- 检查source是否为空,合法性检查很常见,后面不再关注;
- 创建一个
ObservableCreate对象,将source传给它; - 调用
RxJavaPlugins.onAssembly方法,以ObservableCreate为参,并最终返回该方法的返回值。
我们先看看RxJavaPlugins.onAssembly是干嘛的,代码不搞透彻总觉得不够处女座:
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
从类名称也可以看出来,RxJavaPlugins是一个hook类。说人话就是,我们可以设置一些自己的操作。
比如上述方法中的onObservableAssembly我们可以设置一下,打印Observable的类名:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println(observable);
return observable;
}
});
看看结果:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
onSubscribe---main
subscribe---main
发送数据:hello
onNext---main
接收数据:hello
onComplete---main
可以看到打印出来的就是第2步提到的ObservableCreate类。下面就来看一下这个类的构造方法:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
ObservableCreate会把传进来的source先保存下来。这里的source就是我们创建的用来发送数据的ObservableOnSubscribe。如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
到此,create的流程走完了,我们看一下subscribe,注意是链式调用里面的:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看似很长的代码,其实真正相关的就是调用了subscribeActual方法:
protected abstract void subscribeActual(Observer<? super T> observer);
这是一个抽象方法,具体的实现,我们去子类中看。这里的子类就是我们的ObservableCreate:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- 将
observer包装成CreateEmitter对象; - 调用
observer.onSubscribe,CreateEmitter作为参数; - 调用
source.subscribe,CreateEmitter作为参数。
再次说明一下,这里的observer和source是我们自己写的!
这里也能看到,onSubscribe是最先执行的方法。
我们调用e.onNext("hello");发送数据,这个e就是CreateEmitter对象。
然后看一下CreateEmitter:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
这里省略部分代码。该类是对observer的一个封装。
需要注意的是,这里的onNext onComplete继承自Emitter接口,而不是Observer接口。
Rxjava好多接口中存在同名方法,可能产生混乱,要记得区分。

添加subscribeOn
所有代码如下:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println(observable);
return observable;
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
.subscribeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println();
System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
}
@Override
public void onNext(String s) {
System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
System.out.println("接收数据:" + s);
}
@Override
public void onComplete() {
System.out.println("onComplete---" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
添加了subscribeOn(Schedulers.single()),看下打印结果:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005
onSubscribe---main
subscribe---RxSingleScheduler-1
发送数据:hello
onNext---RxSingleScheduler-1
接收数据:hello
onComplete---RxSingleScheduler-1
有如下变化:
- 多了一个
ObservableSubscribeOn对象,该对象的创建过程与ObservableCreate类似 -
onSubscribe依旧在主线程调用,其他的都是在子线程
我们依次创建了ObservableCreate和ObservableSubscribeOn两个对象,那最终调用的就是ObservableSubscribeOn的subscribe方法。
先看一下ObservableSubscribeOn的构造器:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
除了source,还多了一个我们传入的Scheduler。
注意这里的source已经是ObservableCreate对象了。因为我们是在ObservableCreate对象上执行的subscribeOn方法。
我们最终调用的是ObservableSubscribeOn的subscribe方法,会执行到:
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,还是会先在subscribe方法调用的线程调用onSubscribe,然后在scheduler设置的线程运行SubscribeTask。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
任务很简单,在scheculer设置的线程执行source的subscribe方法。这样就完成了线程的切换。
这里还涉及到一个很重要的SubscribeOnObserver类:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
该类的作用:
1、保证只有一次onSubscribe回调,如下:
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
ObservableCreate调用的是这个类中的onSubscribe,该回调并不会继续向下传递,保证只回调一次。
2、及时dispose
3、其他
数据流向图:

添加observeOn
在subscribeOn后面添加一句observeOn(Schedulers.computation()),看下输出:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005
io.reactivex.internal.operators.observable.ObservableObserveOn@25618e91
onSubscribe---main
subscribe---RxSingleScheduler-1
发送数据:hello
onNext---RxComputationThreadPool-1
接收数据:hello
onComplete---RxComputationThreadPool-1
可以看到,又多了一个ObservableObserveOn类。
看一下构造器:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
同样传入source和scheduler。
-
delayError——true,则接收到onError事件后,立即停止onNext分发,并向下传递onError;false,则等待队列中的onNext分发完毕后,再分发onError事件。 -
bufferSize——缓冲区大小,默认是128
我们最终调用的是ObservableObserveOn的subscribe方法,会执行到:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
若传入的是Schedulers.trampoline(),则不作任何处理,直接把下游的observer传递给上游的source;否则使用ObserveOnObserver包装下游的observer,并将ObserveOnObserver传递给上游的source。
先看下ObserveOnObserver的onSubscribe方法:
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
...
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//向下传递
actual.onSubscribe(this);
}
}
这里省略了一些代码,先不去管它。可以看到会创建一个SpscLinkedArrayQueue对象,初始大小为我们设置的bufferSize。实际上,它的结构是这样的:

可以看到是一个链式的数组,可以无限扩容,每次扩容都会new一个新的数组,旧数组的最后一个元素指向新数组。并且数组的容量是bufferSize+1。SpscLinkedArrayQueue在设置数组容量时,也会做限制,如下:
int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));
最小是8,且一定是2的n次方。这是为了方便查找Index,即将求余数简化为n&(bufferSize-1),提高效率。
看一下onNext:
@Override
public void onNext(T t) {
if (done) {
//一旦收到onError或onComplete,立即停止接收新的onNext事件
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先不用考虑sourceMode,正常会将事件放入队列中,然后执行schedule:
void schedule() {
if (getAndIncrement() == 0) {//每次调用使计数+1。只有为0时,才会执行worker.schedule,防止重复调用
worker.schedule(this);
}
}
在worker线程执行run方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
正常执行到drainNormal:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {//队列为空时,最多自旋两次尝试取数据(missed控制)
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {//循环取数据
boolean d = done;
T v;
try {
//从队列取数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {//数据去完,跳出该层循环
break;
}
//向下游分发数据
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这样就完成了线程切换。
missed作用:若当前计数为10,则第一次调用,missed和当前计数变为9;第二次调用计数归0,完成两次自旋后跳出循环,防止cpu空转。这时因为置0了,所以若有新的onNext事件,schedule方法又可以重新调用worker.schedule,开启循环。
作者这种优化思路,非常值得借鉴。

至此,针对上面的例子,有如下结论:
- 每次链式调用都会创建一个新的
Observable对象 -
onSubscribe在当前线程调用 -
subscribeOn多次调用时,数据在第一次调用的线程中发送 -
observeOn多次调用时,数据在最后一次调用的线程中向下分发
上游是下游的source,下游是上游的observer,可以理解为一个双向链表结构。
subscribeOn控制上游所在的线程,observeOn控制下游所在的线程,这样就好理解多了。
多次调用observer
先看下代码吧。ObserveOnObserver#onSubscribe被我们省略的部分代码:
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
//qd就是上一层的ObserveOnObserver对象
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
...
if (m == QueueDisposable.ASYNC) {
//ASYNC
sourceMode = m;
//可以理解为就是上游的队列
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
//设置上游的标志位
outputFused = true;
return ASYNC;
}
return NONE;
}
下游的observeOn会拿到上游的队列。
然后在onNext中:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//只有第一次的observeOn会执行入队逻辑
queue.offer(t);
}
schedule();
}
run方法:
@Override
public void run() {
if (outputFused) {
//非最后一个observeOn调用
drainFused();
} else {
//最后一个observeOn调用
drainNormal();
}
}
drainNormal已经介绍过了,就是从队列中取数据,向下分发。看下drainFused:
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
//向下分发null
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
可以看到只是向下分发了null。这很好理解,因为最下层的observeOn可以直接从最上层的observeOn的队列中取得本次数据。
通过学习源码可以发现,作者在可靠性和性能上做了很多的工作,这都是值得我们学习的地方。
说在最后
Rxjava无疑是一个非常优秀的链式调用框架,能极大减少开发量。不过也有一些弊端不可忽视:
- jar包有2MB
- 调用过程产生大量的临时对象,使用不当会产生严重的内存问题
总而言之,因地制宜,量体裁衣。









