Java 9 Flow API 学习

响应式编程(Reactive Programming)可以理解为一种处理数据项(Data Item)的异步流,即在数据项产生的时候,接收者就对其进行响应。在响应式编程中,会有一个数据发布者(Publisher)和数据订阅者(Subscriber),后者用于异步接收发布者发布的数据。在该模式中,还引入了一个更高级的特性:数据处理器(Processor),它用于将数据发布者发布的数据进行某些转换操作,然后再发布给数据订阅者。

总之,响应式编程是异步非阻塞编程,能够提升程序性能,可以解决传统编程模型遇到的困境。基于这个模型实现的有Java 9 Flow API、RxJava和Reactor等,这里主要介绍的是Java 9 Flow API的使用。

Flow接口概览

Java 9 新增了一个Flow接口,位于java.util.concurrent路径下,构成如下图所示:

QQ截图20190226141817.png

其中Publisher为数据发布者,Subscriber为数据订阅者,Subscription为发布者和订阅者之间的订阅关系,Processor为数据处理器。

Publisher

Publisher部分的源码如下所示:

QQ截图20190226142214.png

它是一个函数式接口,只包含一个subscribe方法,通过这个方法将数据发布出去。

Subscriber

Subscriber部分的源码如下所示: QQ截图20190226142519.png

该接口包含了四个方法:

方法描述
onSubscribe订阅成功的回调方法,用于初始化Subscription,并且表明可以开始接收订阅数据了
onNext接收下一项订阅数据的回调方法
onError在Publisher或Subcriber遇到不可恢复的错误时调用此方法,Subscriber不再接收订阅消息
onComplete当接收完所有订阅数据,并且发布者已经关闭后会回调这个方法

Subscription

Subscription部分的源码如下所示:

QQ截图20190226143424.png

该接口包含了两个方法:

方法描述
request用于向数据发布者请求n个数据项
cancel取消消息订阅,订阅者将不再接收数据

Processor

Processor部分的代码如下所示:

QQ截图20190226143709.png

它是一个空接口,但是它继承了PublisherSubscriber,所以它既能发布数据也能订阅数据。基于这个特性,它可以充当数据转换的角色,先从数据发布者那接收数据项,然后经过处理后再发布给最终的数据订阅者。

发布订阅示例

接下来我们举个数据发布和数据订阅的简单示例,以此了解Java 9 Flow API的使用。先入为主,直接贴出整个示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class FlowApiTest {

public static void main(String[] args) throws InterruptedException {
// 1. 定义 String 类型的数据发布者,JDK 9自带的
// SubmissionPublisher 实现了 Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

// 2. 创建一个订阅者,用于接收发布者的消息
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【订阅者】接收消息 <------ " + item);

// 接收后再次请求一个数据
this.subscription.request(1);

// 如果不想再接收数据,也可以直接调用 cancel,表示不再接收了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【订阅者】数据接收出现异常," + throwable);

// 出现异常,取消订阅,告诉发布者我不再接收数据了
// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调
// 订阅者就不会再继续接收消息了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 当发布者发出的数据都被接收了,
// 并且发布者关闭后,会回调这个方法
System.out.println("【订阅者】数据接收完毕");
}
};

// 3. 发布者和订阅者需要建立关系
publisher.subscribe(subscriber);

// 4. 发布者开始发布数据
for (int i = 0; i < 10; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}

// 5. 发布结束后,关闭发布者
publisher.close();

// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(2000);
}
}

上面使用JDK 自带的Publisher实现类SubmissionPublisher来发布 String类型的数据,然后用匿名实现类的方式创建了一个Subscriber实现类。接着使用SubmissionPublishersubscribe方法来为发布者和订阅者建立关系。建立关系后,发布者就可以发布数据,接收者也开始接收数据。详细的说明注释里都写了,这里就不再赘述代码的逻辑了。

程序的输出如下所示:

QQ截图20190226144617.png

模拟背压

所谓的背压(Backpressure)通俗的讲就是数据接收者的压力,传统模式下,发布者只关心数据的创造与发布,而当数据发布速率远高于数据接收速率的时候,数据接收者缓冲区将被填满,无法再接收数据。发布者并不关心这些,依旧不断地发送数据,所以就造成了IO阻塞。

基于响应式模型实现的Flow API可以很好地解决这个问题。在Java 9的Flow API定义中,Subscriber会将Publisher发布的数据缓冲在Subscription中,其长度默认为256:

QQ截图20190226150028.png

假如当这个缓冲区都被填满后,Publisher将会停止发送数据,直到Subscriber接收了数据Subscription有空闲位置的时候,Publisher才会继续发布数据,而非一味地发个不停。

下面用代码来演示这个情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class FlowApiTest {

public static void main(String[] args) throws InterruptedException {
// 1. 定义String类型的数据发布者,JDK 9自带的
// SubmissionPublisher实现了 Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

// 2. 创建一个订阅者,用于接收发布者的消息
Subscriber<String> subscriber = new Subscriber<>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【订阅者】接收消息 <------ " + item);

// 模拟接收数据缓慢,让缓冲池填满
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了
this.subscription.request(1);

// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【订阅者】数据接收出现异常," + throwable);

// 出现异常,取消订阅,告诉发布者我不再接收数据了
// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调
// 订阅者就不会再继续接收消息了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 当发布者发出的数据都被接收了,
// 并且发布者关闭后,会回调这个方法
System.out.println("【订阅者】数据接收完毕");
}
};

// 3. 发布者和订阅者需要建立关系
publisher.subscribe(subscriber);

// 4. 发布者开始发布数据
for (int i = 0; i < 500; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}

// 5. 发布结束后,关闭发布者
publisher.close();

// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(20000);
}
}

上面代码中,我们在SubscriberonNext方法中用下面的代码模拟延迟,让数据处理过程维持在2秒左右:

1
2
3
4
5
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

然后数据发布量调整到了500,当程序启动的时候,由于数据发布的速度非常快(普通for循环),所以数据订阅者的数据缓冲区瞬间被填满,于是你会看到下面这个情况,只有当数据订阅者处理了一个数据的时候,数据发布者才会相应地再次发布一个新数据:

testasdfasdf.gif

Processor示例

Processor的使用也很简单,其实它就是PublisherSubscriber的结合体,充当数据处理的角色,通常的做法是用它来接收发布者发布的消息,然后进行相应的处理,再将数据发布出去,供消息订阅者接收。

下面是一个Processor用法的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class FlowApiTest2 {
static class MyProcessor extends SubmissionPublisher<String>
implements Processor<String, String> {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
// 接收发布者发布的消息
System.out.println("【处理器】接收消息 <------ " + item);

// 处理器将消息进行转换
String newItem = "【处理器加工后的数据: " + item + "】";

this.submit(newItem);

// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了
this.subscription.request(1);

// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 过程中出现异常会回调这个方法
System.out.println("【处理器】数据接收出现异常," + throwable);

// 出现异常,取消订阅,告诉发布者我不再接收数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
System.out.println("【处理器】数据处理完毕");

// 处理器处理完数据后关闭
this.close();
}
}

public static void main(String[] args) throws InterruptedException {
// 1. 定义String类型的数据发布者,JDK 9自带的
// SubmissionPublisher实现了 Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

// 2. 创建处理器,用于接收发布者发布的消息,
// 转换后再发送给订阅者
MyProcessor processor = new MyProcessor();

// 3. 发布者和处理器建立订阅的关系
publisher.subscribe(processor);

// 4.创建一个订阅者,用于接收处理器的消息
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(String item) {
System.out.println("【订阅者】接收消息 <------ " + item + "");
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
System.out.println("【订阅者】数据接收出现异常," + throwable);
this.subscription.cancel();
}

@Override
public void onComplete() {
System.out.println("【订阅者】数据接收完毕");
}
};

// 5. 处理器和订阅者建立订阅关系
processor.subscribe(subscriber);

// 6. 发布者开始发布数据
for (int i = 0; i < 10; i++) {
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息 ------> " + message);
publisher.submit(message);
}

// 7. 发布结束后,关闭发布者
publisher.close();

// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(2000);
}
}

程序运行结果如下所示:

QQ截图20190226151701.png

参考文档:https://community.oracle.com/docs/DOC-1006738


TOP