9fc333783677465398b50b5259231c5b
022 | 理解和应用响应式编程

近几年,一些新的编程范式开始在 App 端和前端流行起来,并慢慢扩展到了后端,包括响应式编程、函数式编程。这些编程范式的编程思想、编程习惯与我们熟知的面向对象编程有着很大不同,但不少人对其一知半解,理解也不深入,应用起来也容易陷入一些误区。因此,我将用几篇文章分别讲解这些新的编程范式,帮助大伙更好地理解和应用这些编程范式。本篇文章先来聊聊响应式编程。

响应式编程

根据维基百科的定义,响应式编程(Reactive Programming)是关于数据流(data streams)变化传播(propagation of change)的一种声明式(declarative)的编程范式。

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

上面的定义涉及到三个关键词:数据流、变化传播、声明式

所谓数据流,就是一个按照时间排序的数据/事件序列,任何事物,如点击事件、网络请求,都可以用数据流来表达。一条数据流可以包含有多个数据或事件,比如我们打开一个 App,一般会经历数据初始化、登录、请求用户数据、数据展示等一系列流程,这个流程就形成了一条数据流。数据流中的每一步是有顺序的,上游业务完成之后,即是数据/事件发生了变化,这种变化的结果需要通知下游,通知一般就是通过异步监听的方式,下游收到通知之后,才会进行自己的工作。这样,就实现了变化传播。数据流还可以变换、过滤、合并等。

而所谓声明式,其实是和命令式相对立的。命令式编程会告诉计算机“如何”去计算,关心的是具体怎么做;而声明式编程则告诉计算机计算”什么“,它关心的是结果,而不是怎么做的过程。

很多人都会举下面这个例子:

int a = 1;
int b = a + 1;
a = 2;

这时候,b 的值是多少呢?结果是 2,对 a 的第二次赋值并不会影响 b 的值。

假设引入一种新的赋值方式 :=,表示一种对 a 的绑定关系,如下代码:

int a = 1;
int b := a + 1;
a = 2;

这次,b 与 a 有绑定关系了,所以 b 能够根据 a 的变化而变化,因此,这时候 b 的值变为 3 了。

在这个例子中,我们就可以说 := 是一种声明式赋值方式,而普通的 = 是一种命令式赋值方式。

其实,这也是响应式的思想,它希望有某种方式能够构建关系,而不是执行某种赋值命令。响应式编程里所说的数据流,其实也是一系列具有依赖关系的业务逻辑,也称为事务。因此,也有人总结说,响应式编程,其实也是通过异步和数据流来构建事务关系的编程模型。其实就是对数据流的变化进行响应,而因为这个变化什么时候是未知的,因此需要用异步监听的方式来处理。

现在的 App 有着大量与 UI 相关的事务关系,这也是为什么响应式编程先在 App 端流行起来的原因。

如今,每个平台都有相应的响应式编程库,RxJava、RxSwift、RxJS 主要分别用于 Android、iOS 和 Web 开发,还有 WebFlux 是 Spring Framework 5.0 新增的响应式编程框架,用于服务端的开发。JDK 9 也加入了 Flow API 用于支持响应式编程。

响应式流

响应式流(Reactive Streams)是一个响应式编程的规范,用来为具有非阻塞背压(Backpressure)的异步流处理提供标准,用最小的一组接口、方法和协议,用来描述必要的操作和实体。这里涉及到一个关键概念叫 Backpressure,国内大部分翻译为背压,我们先来了解这是什么。

响应式编程,其实就是对数据流的编程,而对流的处理,前面我们说过,对数据流的变化进行响应,是通过异步监听的方式来处理的。既然是异步监听,就涉及到监听事件的发布者和订阅者,数据流其实就是由发布者生产,再由一个或多个订阅者进行消费的元素(item)序列。

那么,如果发布者生产元素的速度和订阅者消费元素的速度不一样,是否会出现问题呢?其实就两种情况:

  • 发布者生产的速度比订阅者消费的速度慢,那生产的元素可以及时被处理,订阅者处理完只要等待发布者发送下一元素即可,这不会产生什么问题。
  • 发布者生产的速度比订阅者消费的速度快,那生产的元素无法被订阅者及时处理,就会产生堆积,如果堆积的元素多了,订阅者就会承受巨大的资源压力(pressure)而有可能崩溃。

要应对第二种情况,就需要进行流控制(flow control)。流控制有多种方案,其中一种机制就是 Backpressure,即背压机制,其实就是下游能够向上游反馈流量需求的机制。

响应式流规范只定义了四个接口:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

发布者 Publisher

发布者接口只定义了一个 subscribe 方法,用来接受订阅者进行订阅。T 代表发布者和订阅者之间传输的数据类型。发布者绑定订阅者对象之后,就可以调用订阅者的方法,向订阅者发送各种事件。 subscribe 方法可以被调用多次,每次接受不同的订阅者。

订阅者 Subscriber

订阅者接口定义了四个事件方法,这些方法都是由发布者触发调用的,分别会在开启订阅、接收数据、发生错误和数据传输结束时被调用。onSubscribeonErroronComplete 最多只会被调用一次,而 onNext 一般会被调用多次,数据流中的每个元素就是通过该方法从发布者传递给订阅者的。

订阅 Subscription

订阅接口声明了两个方法,这两个方法都只会由订阅者调用,request(n) 表示订阅者请求元素的数量,该方法也是实现背压机制的关键方法;cancel 表示取消订阅,用来停止接收数据。当执行发布者的 subscribe 方法时,发布者会调用订阅者的 onSubscribe 方法,订阅者就会借助传入的 Subscription 对象向发布者请求 n 个元素。然后发布者通过不断调用订阅者的 onNext 方法向订阅者发送最多 n 个元素。如果元素全部发完,则会调用 onComplete 通知订阅者数据流已经发完。如果有错误发生,则调用 onError 发出错误通知,同样也会终止流。

处理器 Processor

处理器既是发布者又是订阅者,用于在发布者和订阅者之间转换数据格式,把发布者的 T 类型数据转换为订阅者接受的 R 类型数据。所以,处理器其实就是数据转换的中介。可以有多个处理器同时使用,组成一个处理链。

以上就是响应式流规范里的基本内容了,不少响应式的库都遵循了此规范,但具体实现上都比这规范复杂很多。所以,要真正理解响应式编程的魅力,并能应用到实际开发中,还是要从实际的框架出发,下面就以 RxJava 为例,聊聊响应式编程具体怎么做。

RxJava

RxJava 已经广泛应用在 Android 开发中,目前已经更新到 3.x 版本。Android 广泛应用的网络框架 Retrofit 也添加了 RxJava 的适配器,可以和 RxJava 结合使用。还有另外一个也广泛应用于 Android 的框架叫 RxBinding,它是一套用于 Android UI 控件的 RxJava Binding API,可以将 Button 的点击事件、ListView 的点击事件、EditText 的文本变化事件等转为响应式数据流处理。

RxJava 的实现思路主要是对观察者模式的扩展,观察者模式中,就有被观察者(Observable)观察者(Observer),分别对应于发布-订阅模式的发布者(Publisher)订阅者(Subscriber)。RxJava 实现了多套观察者模式,每套实现的基础类分别为:

  • Flowable:这是遵循了响应式流规范的一套实现,支持背压机制
  • Observable:这是不支持背压的一套实现
  • Single:只有一个元素或者错误的流
  • Completable:没有任何元素,只有一个完成或错误信号的流
  • Maybe:没有任何元素或者只有一个元素或者只有一个错误的流

这些不同的实现,大部分操作功能类似,下面的讲解就以 Flowable 为主。

根据 RxJava 的文档描述,Flowable 类实现了响应式流规范并提供了多个工厂方法和中转的操作符,以及消费响应式数据流的能力。

工厂方法

Flowable 继承了响应式流中定义的 Publisher 接口,因此,Flowable 其实也是一个发布者。但 Flowable 本身又只是一个抽象类,所以无法直接用 new 进行实例化。但 Flowable 提供了很多工厂方法用来创建实例,有 create、defer、repeat、just、range、interval、intervalRange、timer、fromXXX 等各种方法。除了 Flowable 类有这些方法,Observable 类也一样有这些方法,只是个别方法的参数不同。Single、Completable、Maybe 也有大部分一样的方法。

其中大部分方法在网上都能找到很好的说明,我就不再赘述了,这里我推荐一篇文章《RxJava2 只看这一篇文章就够了》,该文罗列了非常多的操作符方法,也包括创建发布者的工厂方法。

我这里要补充讲讲 Flowable 的 create 方法,该方法需要传入两个参数,第一个参数为 FlowableOnSubscribe 对象,这是一个接口,和 Publisher 一样,只定义了一个 subscribe 方法,数据流的发射就在该方法里实现;第二个参数为 BackpressureStrategy 枚举类型的数据,用来指定背压策略。

为了应对背压,Flowable 设置了一个缓存区用来缓存未处理的数据,缓存区的容量默认为 128,即只能缓存 128 个元素。那么,当缓存满了之后,上游还继续发射元素的情况下该怎么处理?背压策略就是应对这种情况下的处理策略。BackpressureStrategy 共定义了五种策略:

  • BUFFER:这是 Flowable 的默认策略,会将新元素缓存进另一个无大小限制的缓存区
  • ERROR:缓存区只要超限制,直接抛出异常MissingBackpressureException
  • DROP:缓存区满了之后,再接收到的元素会直接被丢弃
  • LATEST:会缓存最后的一个元素,即如果发送了140个元素,缓存区里会保存129个元素,除了第1到第128个元素,还会缓存最后的第140个元素
  • MISSING:不会做缓存或丢弃处理,即在create方法不指定背压策略,需要下游通过背压操作符onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest()指定背压策略

中转操作符

响应式编程最大的魅力其实就是有各种各样的操作符可以将数据流进行各种转换、过滤、组合等操作。上面提到的那些工厂方法其实也是操作符,可以归类为创建操作符。

top Created with Sketch.