11.1.1 定义响应式流

Reactive Streams 是 2013 年底由 Netflix、Lightbend 和 Pivotal(Spring 背后的公司)的工程师发起的一项计划。响应式流旨在为无阻塞异步 Backpressure 流处理提供一个标准。

我们已经谈到了响应式编程的异步特性;它使我们能够并行执行任务以获得更大的可伸缩性。Backpressure(译者注:https://www.zhihu.com/question/49618581/answer/237078934 )是一种手段,通过对用户愿意处理的数据量设定限制,数据消费者可以避免被生产速度过快的数据淹没。

Java StreamsReactive Streams 对比

在 Java 流和响应式流之间有很大的相似性。首先,它们的名字中都含有 Streams。它们也都为处理数据提供函数式接口。事实上,稍后当学到容器的时候,您会看到,其实它们有很多共同操作。

然而,Java 流通常是同步的,同时只能处理有限数据集。它们本质上是使用函数式进行集合迭代的一种手段。

响应式流支持任何大小的数据集,包括无限数据集的异步处理。它们使实时处理数据成为了可能。

响应式流的规范可以通过四个接口定义来概括:Publisher,Subscriber,Subscription 和 Processor。Publisher 为每一个 Subscription 的 Subscriber 生产数据。Publisher 接口声明了一个 subscribe() 方法,通过这个方法 Subscriber 可以订阅 Publisher:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

Subscriber 一旦进行了订阅,就可以从 Publisher 中接收消息,这些消息都是通过 Subscriber 接口中的方法进行发送:

public interface Subscriber<T> {
    void onSubscribe(Subscription sub);
    void onNext(T item);
    void onError(Throwable ex);
    void onComplete();
}

Subscriber 通过调用 onSubscribe() 函数将会收到第一个消息。当 Publisher 调用 onSubscribe(),它通过一个 Subscription 对象将消息传输给 Subscriber。消息是通过 Subscription 进行传递的,Subscriber 可以管理他自己的订阅内容:

public interface Subscription {
    void request(long n);
    void cancel();
}

Subscriber 可以调用 request() 去请求被被发送了的数据,或者调用 cancel() 来表明他对接收的数据不感兴趣,并取消订阅。当调用 request() 时,Subscriber 通过传递一个 long 值的参数来表示它将会接收多少数据。这时就会引进 backpressure,用以阻止 Publisher 发送的数据超过 Subscriber 能够处理的数据。在 Publisher 发送了足够的被请求的数据后,Subscriber 可以再次调用 request() 来请求更多的数据。

一旦 Subcriber 已经接收到数据,数据就通过流开始流动了。每一个 Publisher 发布的项目都会通过调用 onNext() 方法将数据传输到 Subscriber。如果出现错误,onError() 方法将被调用。如果 Publisher 没有更多的数据需要发送了,同时也不会再生产任何数据了,将会调用 onComplete() 方法来告诉 Subscriber,它已经结束了。

对于 Processor 接口而言,它连接了 Subscriber 和 Publisher:

public interface Processor<T, R>
          extends Subscriber<T>, Publisher<R> {}

作为 Subscriber,Processor 将会接收数据然后以一定的方式处理这些数据。然后它会摇身一变,变为一个 Publisher,将处理的结果发布到 Subscriber。

正如您所看到的,响应式流规范相当地简单。关于如何从 Publisher 开始建立起一个数据处理的通道,这也是一件很容易的事情了,通过将数据不输入或是输入到多个 Processor 中,然后将最终结果传递到 Subscriber 中就行了。

然而,响应流接口不适合的是,组成这样一个功能性方式的流。Reactor 工程实现了响应式流的规范,它提供由响应式流组成的函数式 API。正如您将在后面的章节中看到的,Reactor 是 Spring 响应式编程模型的基础。在本章的剩余部分,我们将探索 Reactor 工程。

results matching ""

    No results matching ""