Fa0de737d60052b3776ccd7cf4ce8ec6
RxJava 原理剖析

RxJava这个项目已经持续四年半了,第一个 commit 是在 2012 年 3 月 18 号。我从 14 年 11 月份开始使用 RxJava,应该算是比较早的,将近两年过去了,现在 RxJava 1.x 版本已经进入稳定期,2.0 版本也已经进入了 RC 阶段。

原本打算把 Advanced RxJava 系列博客翻译完之后再拆 RxJava 的,但是前两周看了一个 JW 讲 RxJava 的视频,突然有种隐隐打通任督二脉的感觉,索性趁着中秋佳节,一鼓作气把 RxJava 好好拆开看个究竟。本文的分析基于 RxJava 截至 2016.9.16 的最新源码,非常建议大家下载 RxJava 源码之后,跟着本文,过一遍源码。

1,整体思路

拆轮子这也是第四回了,套路也算得到了很好的验证,顺着常用的场景/用例出发,理解整个过程、结构、原理,不要沉迷于细节,先对常用的内容有一个全局的概览,每一块的细节再按需深入。入手新项目也是这个思路。

对 RxJava 来说,基于目前已有的认识,我觉得主要应该抓住四个方面:

  • 事件流源头(observable)怎么发出数据
  • 响应者(subscriber)怎么收到数据
  • 怎么对事件流进行操作(operator/transformer)
  • 以及整个过程的调度(scheduler)

另外还有三点也值得一提:

  • backpressure
  • hook
  • 测试

2,Hello world

我们先看一个最简单的 Hello world 例子:

  • Observable.just("Hello world")
  • .subscribe(word -> {
  • System.out.println("got " + word + " @ "
  • + Thread.currentThread().getName());
  • });

2.1,just

逐行往下看显然是最自然的方式,那我们先看看 just()

  • // Observable.java
  • public static <T> Observable<T> just(final T value) {
  • return ScalarSynchronousObservable.create(value);
  • }
  • // ScalarSynchronousObservable.java
  • public static <T> ScalarSynchronousObservable<T> create(T t) {
  • return new ScalarSynchronousObservable<T>(t); // 1
  • }
  • protected ScalarSynchronousObservable(final T t) {
  • super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t))); // 2
  • this.t = t;
  • }

这里一定要注意,不要沉迷于细节,否则上万行的代码绝不是一两天能看出个大概来的。

  1. 我们创建的是 ScalarSynchronousObservable,一个 Observable 的子类。
  2. 我们先跳过 RxJavaHooks,从名字可以得知它是用来做一些 hook 的工作的,那我们就先认为它什么也不做。所以我们传给父类构造函数的就是 JustOnSubscribe,一个 OnSubscribe 的实现类。

Observable 的构造函数接受一个 OnSubscribe,它是一个回调,会在 Observable#subscribe 中使用,用于通知 observable 自己被订阅,它是怎么使用的,我们马上就能看到。

2.2,subscribe

我们接着看 subscribe()

  • public final Subscription subscribe(final Action1<? super T> onNext) {
  • // 省略参数检查代码
  • Action1<Throwable> onError =
  • InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
  • Action0 onCompleted = Actions.empty();
  • return subscribe(new ActionSubscriber<T>(onNext,
  • onError, onCompleted)); // 1
  • }
  • public final Subscription subscribe(Subscriber<? super T> subscriber) {
  • return Observable.subscribe(subscriber, this);
  • }
  • static <T> Subscription subscribe(Subscriber<? super T> subscriber,
  • Observable<T> observable) {
  • // 省略参数检查代码
  • subscriber.onStart(); // 2
  • if (!(subscriber instanceof SafeSubscriber)) {
  • subscriber = new SafeSubscriber<T>(subscriber); // 3
  • }
  • try {
  • RxJavaHooks.onObservableStart(observable,
  • observable.onSubscribe).call(subscriber); // 4
  • return RxJavaHooks.onObservableReturn(subscriber); // 5
  • } catch (Throwable e) {
  • // 省略错误处理代码
  • }
  • }

我们抓住主要逻辑:

  1. 我们首先对传入的 Action 进行包装,包装为 ActionSubscriber,一个 Subscriber 的实现类。
  2. 调用 subscriber.onStart() 通知 subscriber 它已经和 observable 连接起来了。这里我们就知道,onStart() 就是在我们调用 subscribe() 的线程执行的。
  3. 如果传入的 subscriber 不是 SafeSubscriber,那就把它包装为一个 SafeSubscriber
  4. 我们再次跳过 hook,认为它什么也没做,那这里我们调用的其实就是 observable.onSubscribe.call(subscriber)。这里我们就看到了前面提到的 onSubscribe 的使用代码,在我们调用 subscribe() 的线程执行这个回调。
  5. 跳过 hook,那么这里就是直接返回了 subscriberSubscriber 继承了 Subscription,用于取消订阅。

关于 SafeSubscriber,我们跳过源码,直接看它的文档,其中说明了这个类的作用:保证 Subscriber 实例遵循 Observable contract。至于具体怎么保证的,以及 contract 的内容,大家直接看文档即可,这里不再赘述。

好了,我们已经看完了例子中两个调用的代码,但是 Hello world 是怎么被传递到打印的代码里的呢?别急,就在 observable.onSubscribe.call(subscriber) 中。

2.3,OnSubscribe

还记得 just() 的实现中,我们创建了一个 JustOnSubscribe 吗?这里我们执行的就是它实现的 call() 函数:

  • // ScalarSynchronousObservable.java
  • static final class JustOnSubscribe<T> implements OnSubscribe<T> {
  • // ...
  • @Override
  • public void call(Subscriber<? super T> s) {
  • s.setProducer(createProducer(s, value));
  • }
  • }
  • static <T> Producer createProducer(Subscriber<? super T> s, T v) {
  • // ...
  • return new WeakSingleProducer<T>(s, v);
  • }

这里我们就是为 subscriber 设置了一个 WeakSingleProducer

在 RxJava 1.x 中,数据都是从 observable push 到 subscriber 的,但要是 observable 发得太快,subscriber 处理不过来,该怎么办?一种办法是,把数据保存起来,但这显然可能导致内存耗尽;另一种办法是,多余的数据来了之后就丢掉,至于丢掉和保留的策略可以按需制定;还有一种办法就是让 subscriber 向 observable 主动请求数据,subscriber 不请求,observable 就不发出数据。它俩相互协调,避免出现过多的数据,而协调的桥梁,就是 producer。producer 的内容这里不展开,大家可以看 ReactiveIO 的文档,或者 Advanced RxJava个系列博客。

2.4,setProducer

我们接着看 setProducer() 的实现:

  • // Subscriber.java
  • public void setProducer(Producer p) {
  • long toRequest;
  • boolean passToSubscriber = false;
  • synchronized (this) {
  • toRequest = requested;
  • producer = p;
  • if (subscriber != null) { // 1
  • if (toRequest == NOT_SET) {
  • passToSubscriber = true;
  • }
  • }
  • }
  • if (passToSubscriber) { // 2
  • subscriber.setProducer(producer);
  • } else {
  • if (toRequest == NOT_SET) { // 3
  • producer.request(Long.MAX_VALUE);
  • } else {
  • producer.request(toRequest);
  • }
  • }
  • }

这里逻辑比较复杂,但是我们理清我们当前所处的状态,就简单了:

  1. 我们这里确实有一层包装,ActionSubscriber -> SafeSubscriber
  2. 所以这里我们会发生一次 pass through,然后我们会进入 else 代码块。
  3. 这里所有的 requested 初始值都是 NOT_SET,所以我们会请求 Long.MAX_VALUE,即无限个数据。

2.5,request

那我们再看 WeakSingleProducer#request() 的实现:

  • // ScalarSynchronousObservable.java
  • static final class WeakSingleProducer<T> implements Producer {
  • // ...
  • @Override
  • public void request(long n) {
  • // 省略状态检查代码
  • Subscriber<? super T> a = actual;
  • if (a.isUnsubscribed()) {
  • return;
  • }
  • T v = value;
  • try {
  • a.onNext(v);
  • } catch (Throwable e) {
  • Exceptions.throwOrReport(e, a, v);
  • return;
  • }
  • if (a.isUnsubscribed()) {
  • return;
  • }
  • a.onCompleted();
  • }
  • }

我们看到,在 request() 中,终于调用了 subscriber 的 onNext()onCompleted(),那么,Hello world 就传递到了我们的 Action 中,并被打印出来了。

2.6,完整的过程

到这里我们就已经梳理出完整的调用过程了:

一切行为都由 subscribe 触发,而且都是直接的函数调用,所以都在调用 subscribe 的线程执行。

下面我们看一下调试时的调用栈:

确实和我们的分析结果一致。

3,操作符

我们把 Hello world 稍微变复杂一点,使用一个操作符:

  • Observable.just("Hello world")
  • .map(String::length)
  • .subscribe(word -> {
  • System.out.println("got " + word + " @ "
  • + Thread.currentThread().getName());
  • });

我们使用了一个 map 操作符,把字符串转换为它的长度。

3.1,map

  • // Observable.java
  • public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
  • return create(new OnSubscribeMap<T, R>(this, func));
  • }
  • public static <T> Observable<T> create(OnSubscribe<T> f) {
  • return new Observable<T>(RxJavaHooks.onCreate(f));
  • }

这里有两个小插曲,一是 map 的实现本来是利用 lift + Operator 实现的,但是后来改成了 create + OnSubscribeRxJava #4097);二是 lift 的实现本来是直接调用 observable 构造函数,后来改成了调用 createRxJava #4007)。后者先发生,引入了新的 hook 机制,前者则是为了提升一点性能。

所以这里实际上是 OnSubscribeMap 干活了。

3.2,OnSubscribeMap

那我们看看 OnSubscribeMap

  • public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
  • final Observable<T> source;
  • final Func1<? super T, ? extends R> transformer;
  • public OnSubscribeMap(Observable<T> source,
  • Func1<? super T, ? extends R> transformer) {
  • this.source = source;
  • this.transformer = transformer;
  • }
  • @Override
  • public void call(final Subscriber<? super R> o) {
  • MapSubscriber<T, R> parent =
  • new MapSubscriber<T, R>(o, transformer); // 1
  • o.add(parent); // 2
  • source.unsafeSubscribe(parent); // 3
  • }
  • }

它的实现很直观:

  1. 利用传入的 subscriber 以及我们进行转换的 Func1 构造一个 MapSubscriber
  2. 把一个 subscriber 加入到另一个 subscriber 中,是为了让它们可以一起取消订阅。
  3. unsafeSubscribe 相较于前面的 subscribe,可想而知就是少了一层 SafeSubscriber 的包装。为什么不要包装?因为我们会在最后调用 Observable#subscribe 时进行包装,只需要包装一次即可。

转换的代码依然没有出现,它在 MapSubscriber 中。

3.3,MapSubscriber

``` java
static final class MapSubscriber extends Subscriber {

  • final Subscriber<? super R> actual;
  • final Func1<? super T, ? extends R> mapper;
  • boolean done;
  • public MapSubscriber(Subscriber<? super R> actual,
  • Func1<? super T, ? extends R> mapper) {
  • this.actual = actual;
  • this.mapper = mapper;
  • }
  • @Override
  • public void onNext(T t) {
  • R result;
  • try {
  • result = mapper.call(t); // 1
  • } catch (Throwable ex) {
  • Exceptions.throwIfFatal(ex);
  • unsubscribe();
  • onError(OnErrorThrowable.addValueAsLastCause(ex, t));
  • return;
  • }
  • actual.onNext(result); // 2
  • }
  • // 省略 onError,onCompleted 和 setProducer

}

top Created with Sketch.