Java是一个“古老”并且广泛应用的编程语言,但Java9中引入了一些新鲜有趣的特性。这篇文章主要介绍FlowAPI这个新特性,通过FlowAPI我们仅仅使用JDK就能够搭建响应式应用程序,而不需要其他额外的类库,如RxJava或Project Reactor。
尽管如此,当你看到过接口文档后你就会明白到正如字面所说,这只是一个API而已。她仅仅包含了一些Interface和一个实现类:
1.Interface Flow.Publisher定义了生产数据和控制事件的方法。 2.Interface Flow.Subscriber 定义了消费数据和事件的方法。 3.Interface Flow.Subscription 定义了链接Publisher和Subscriber的方法。 4.Interface Flow.Processor 定义了转换Publisher到Subscriber的方法 5.最后,class SubmissionPublisher 是Flow.Publisher 的实现,她可以灵活的生产数据,同时与Reactive Stream兼容。
虽然Java9中没有很多FlowAPI的实现类可供我们使用,但是依靠这些接口第三方可以提供的响应式编程得到了规范和统一,比如从JDBC driver到RabbitMQ的响应式实现。
其中Publisher为数据发布者,Subscriber为数据订阅者,Subscription为发布者和订阅者之间的订阅关系,Processor为数据处理器。
- 关系图:
我对响应式编程的理解是, 这是一种数据消费者控制数据流的编程方式。需要指出是,当消费速度低于生产速度时,消费者要求生产者降低速度以完全消费数据(这个现象称作back-pressure(背压))。这种处理方式不是在制造混乱,你可能已经使用过这种模式,只是最近因为在主要框架和平台上使用才变得更流行,比如Java9,Spring5。另外在分布式系统中处理大规模数据传输时也使用到了这种模式。
回顾过去可以帮我们更好的理解这种模式。
- pull模式
几年前,最常见的消费数据模式是pull-based。client端不断轮询服务端以获取数据。这种模式的优点是当client端资源有限时可以更好的控制数据流(停止轮询),而缺点是当服务端没有数据时轮询是对计算资源和网络资源的浪费。
- push模式
随着时间推移,处理数据的模式转变为push-based,生产者不关心消费者的消费能力,直接推送数据。这种模式的缺点是当消费资源低于生产资源时会造成缓冲区溢出从而数据丢失,当丢失率维持在较小的数值时还可以接受,但是当这个比率变大时我们会希望生产者降速以避免大规模数据丢失。
- pull-push模式
响应式编程是一种pull-push混合模式以综合他们的优点,这种模式下消费者负责请求数据以控制生产者数据流,同时当处理资源不足时也可以选择阻断或者丢弃数据,接下来我们会看到一个典型案例。
三、Flow与Stream响应式编程并不是为了替换传统编程,其实两者相互兼容而且可以互相协作完成任务。Java8中引入的StreamAPI通过map,reduce以及其他操作可以完美的处理数据集,而FlowAPI则专注于处理数据的流通,比如对数据的请求,减速,丢弃,阻塞等。同时你可以使用Streams作为数据源(publisher),当必要时阻塞丢弃其中的数据。你也可以在Subscriber中使用Streams以进行数据的归并操作。更值得一提的是:reactive streams不仅兼容传统编程方式,而且还支持函数式编程以极大的提高可读性和可维护性。有一点可能会使我们感到困惑:如果你需要在两个系统间传输数据,同时进行转形操作,如何使用Flows和Streams来完成?这种情况下,我们使用Java8的Function来做数据转换,但是如何在Publisher和Subscriber之间使用StreamAPI呢?答案是我们可以在Publisher和Subscriber之间再加一个subscriber,她可以从最初的publisher获取数据,转换,然后再作为一个新的publisher,而使最初的subscriber订阅这个新的publisher,也是Java9中的接口Flow.Processor
- Publisher
Publisher部分的源码如下所示:
它是一个函数式接口,只包含一个subscribe方法,通过这个方法将数据发布出去。
- Subscriber
Subscriber部分的源码如下所示:
该接口包含了四个方法:
- Subscription
Subscription部分的源码如下所示:
- Processor
Processor部分的代码如下所示:
它是一个空接口,但是它继承了Publisher和Subscriber,所以它既能发布数据也能订阅数据。基于这个特性,它可以充当数据转换的角色,先从数据发布者那接收数据项,然后经过处理后再发布给最终的数据订阅者。
- 发布订阅示例
接下来我们举个数据发布和数据订阅的简单示例,以此了解Java 9 Flow API的使用。先入为主,直接贴出整个示例代码:
public class FlowApiTest {
public static void main(String[] args) throws InterruptedException {
// 1. 定义 String 类型的数据发布者,JDK 9自带的
// SubmissionPublisher 实现了 Publisher
SubmissionPublisher publisher = new SubmissionPublisher<>();
// 2. 创建一个订阅者,用于接收发布者的消息
Subscriber subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【订阅者】接收消息 <------ " + item);
// 接收后再次请求一个数据
this.subscription.request(1);
// 如果不想再接收数据,也可以直接调用 cancel,表示不再接收了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【订阅者】数据接收出现异常," + throwable);
// 出现异常,取消订阅,告诉发布者我不再接收数据了
// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调
// 订阅者就不会再继续接收消息了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 当发布者发出的数据都被接收了,
// 并且发布者关闭后,会回调这个方法
System.out.println("【订阅者】数据接收完毕");
}
};
// 3. 发布者和订阅者需要建立关系
publisher.subscribe(subscriber);
// 4. 发布者开始发布数据
for (int i = 0; i < 10; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}
// 5. 发布结束后,关闭发布者
publisher.close();
// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(2000);
}
}
上面使用JDK 自带的Publisher实现类SubmissionPublisher来发布 String类型的数据,然后用匿名实现类的方式创建了一个Subscriber实现类。接着使用SubmissionPublisher的subscribe方法来为发布者和订阅者建立关系。建立关系后,发布者就可以发布数据,接收者也开始接收数据。详细的说明注释里都写了,这里就不再赘述代码的逻辑了。
- 模拟背压
所谓的背压(Backpressure)通俗的讲就是数据接收者的压力,传统模式下,发布者只关心数据的创造与发布,而当数据发布速率远高于数据接收速率的时候,数据接收者缓冲区将被填满,无法再接收数据。发布者并不关心这些,依旧不断地发送数据,所以就造成了IO阻塞。基于响应式模型实现的Flow API可以很好地解决这个问题。在Java 9的Flow API定义中,Subscriber会将Publisher发布的数据缓冲在Subscription中,其长度默认为256:
假如当这个缓冲区都被填满后,Publisher将会停止发送数据,直到Subscriber接收了数据Subscription有空闲位置的时候,Publisher才会继续发布数据,而非一味地发个不停。下面用代码来演示这个情况:
public class BPFlowTest {
public static void main(String[] args) throws InterruptedException {
// 1. 定义String类型的数据发布者,JDK 9自带的
// SubmissionPublisher实现了 Publisher
SubmissionPublisher publisher = new SubmissionPublisher<>();
// 2. 创建一个订阅者,用于接收发布者的消息
Subscriber subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【订阅者】接收消息 <------ " + item);
// 模拟接收数据缓慢,让缓冲池填满
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了
this.subscription.request(1);
// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【订阅者】数据接收出现异常," + throwable);
// 出现异常,取消订阅,告诉发布者我不再接收数据了
// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调
// 订阅者就不会再继续接收消息了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 当发布者发出的数据都被接收了,
// 并且发布者关闭后,会回调这个方法
System.out.println("【订阅者】数据接收完毕");
}
};
// 3. 发布者和订阅者需要建立关系
publisher.subscribe(subscriber);
// 4. 发布者开始发布数据
for (int i = 0; i < 500; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}
// 5. 发布结束后,关闭发布者
publisher.close();
// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(20000);
}
}
上面代码中,我们在Subscriber的onNext方法中用下面的代码模拟延迟,让数据处理过程维持在2秒左右:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
然后数据发布量调整到了500,当程序启动的时候,由于数据发布的速度非常快(普通for循环),所以数据订阅者的数据缓冲区瞬间被填满,于是你会看到下面这个情况,只有当数据订阅者处理了一个数据的时候,数据发布者才会相应地再次发布一个新数据:
- Processor示例
Processor的使用也很简单,其实它就是Publisher和Subscriber的结合体,充当数据处理的角色,通常的做法是用它来接收发布者发布的消息,然后进行相应的处理,再将数据发布出去,供消息订阅者接收。下面是一个Processor用法的简单示例:
public class ProcessorTest {
static class MyProcessor extends SubmissionPublisher implements Processor {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【处理器】接收消息 <------ " + item);
// 处理器将消息进行转换
String newItem = "【处理器加工后的数据: " + item + "】";
this.submit(newItem);
// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了
this.subscription.request(1);
// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【处理器】数据接收出现异常," + throwable);
// 出现异常,取消订阅,告诉发布者我不再接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("【处理器】数据处理完毕");
// 处理器处理完数据后关闭
this.close();
}
}
public static void main(String[] args) throws InterruptedException {
// 1. 定义String类型的数据发布者,JDK 9自带的
// SubmissionPublisher实现了 Publisher
SubmissionPublisher publisher = new SubmissionPublisher<>();
// 2. 创建处理器,用于接收发布者发布的消息,
// 转换后再发送给订阅者
MyProcessor processor = new MyProcessor();
// 3. 发布者和处理器建立订阅的关系
publisher.subscribe(processor);
// 4.创建一个订阅者,用于接收处理器的消息
Subscriber subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("【订阅者】接收消息 <------ " + item + "");
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("【订阅者】数据接收出现异常," + throwable);
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("【订阅者】数据接收完毕");
}
};
// 5. 处理器和订阅者建立订阅关系
processor.subscribe(subscriber);
// 6. 发布者开始发布数据
for (int i = 0; i < 10; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}
// 7. 发布结束后,关闭发布者
publisher.close();
// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(2000);
}
}
参考文章
参考文章



