6.3 Reactor
Reactor是第四代Reactive库,基于Reactive Streams规范在JVM上构建非阻塞应用程序。Reactor侧重于服务器端响应式编程,是一个基于Java 8实现的响应式流规范(Reactive Streams specification)响应式库。
作为Reactive Engine/SPI,Reactor Core和IO模块都为重点使用场景提供了响应流构造,最终与Spring、RxJava、Akka Streams和Ratpack等框架结合使用,作为Reactive API,Reactor框架模块提供了丰富的消费功能,如组合和发布订阅事件。
本节对Reactor的介绍以基本的概念和简单使用为主,更多Reactor高级特性可参考Reactor官网:http://projectreactor.io/。
6.3.1 Flux与Mono
在Reactor中,数据流发布者(Publisher)由Flux和Mono两个类表示,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0个或多个(0..N)元素的响应式序列,而一个Mono对象代表一个包含0或一个(0..1)元素的结果。
作为数据流发布者,Flux和Mono都可以发出三种数据信号,元素值、错误信号和完成信号。错误信号和完成信号都是终止信号。完成信号用来告知下游订阅者,数据流是正常结束的。错误信号在终止数据流的同时将错误信息传递给下游订阅者。这三种信号不是一定要完全具备的。
图6-1所示是一个Flux类型的数据流,横坐标是时间轴,⑥后的黑色竖线是完成信号。连续发出1~6共6个元素值,以及一个完成信号,完成信号告知订阅者数据流已经结束。
图6-1 Flux类型的数据流图
图6-2是一个Mono类型的数据流,其发出一个元素值后,立刻发出一个完成信号。
图6-2 Mono类型的数据流图
下面通过案例分析Reactor的使用。
首先创建一个maven项目,然后在pom.xml中加入对maven的依赖。可以到maven仓库https://mvnrepository.com/查询最新版本的Reactor。截止本书出版,Reactor最新的版本是3.2.0.RELEASE。
为了方便测试,还需要添加对reactor-test的依赖和Junit的依赖。
下面就可以开始用Reactor进行编码了。
首先使用代码声明图6-1和图6-2中的Flux和Mono,代码如下:
Flux.just(1, 2, 3, 4, 5, 6); Mono.just(1);
Flux和Mono提供了多种创建数据流的方法,just是一种比较直接的声明数据流的方式,其参数就是数据元素。
对于图6-1中的场景,还可以使用如下多种声明方式。
基于数组的声明方式:
Integer[] array = new Integer[]{1,2,3,4,5,6}; Flux.fromArray(array);
基于集合的声明方式:
List<Integer> list = Arrays.asList(array); Flux.fromIterable(list);
基于Stream的声明方式:
Stream<Integer> stream = list.stream(); Flux.fromStream(stream);
上文中提到元素值、错误信号和完成信号三者并不是要完全具备的,下面就给出几种情况:
// 只有完成信号的空数据流 Flux.just(); Flux.empty(); Mono.empty(); Mono.justOrEmpty(Optional.empty()); // 只有错误信号的数据流 Flux.error(new Exception("some error")); Mono.error(new Exception("some error"));
6.3.2 subscribe()
subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,下面介绍几种常见的subscribe()方法:
下面通过一个案例验证Flux、Mono和几种常见的subscribe()方法的使用。案例代码如下:
/** * @Author zhouguanya * @Date 2018/10/22 * @Description 第一个Reactor程序 */ public class FirstReactorDemo { public static void main(String[] args) { // 测试Flux Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print); System.out.println("\n----------------------------"); // 测试Mono Mono.just(1).subscribe(System.out::println); System.out.println("----------------------------"); // 测试两个参数的subscribe方法 Flux.just(1, 2, 3, 4, 5, 6) .subscribe(System.out::print, System.err::println); System.out.println("\n----------------------------"); // 测试三个参数的subscribe方法 Flux.just(1, 2, 3, 4, 5, 6) .subscribe(System.out::print, System.err::println, () -> System.out.println("\ncomplete")); System.out.println("----------------------------"); // 测试四个参数的subscribe方法 Flux.just(1, 2, 3, 4, 5, 6) .subscribe(System.out::print, System.err::println, () -> System.out.println("\ncomplete"), subscription -> { System.out.println("订阅发生了"); subscription.request(10); }); } }
运行案例代码,得到如下运行结果:
123456 ---------------------------- 1 ---------------------------- 123456 ---------------------------- 123456 complete ---------------------------- 订阅发生了 123456 complete
在命令式或同步式编程世界中,调试通常都是非常直观的——直接看stack trace就可以找到问题出现的位置以及异常信息等。
当切换到响应式的异步代码,事情就变得复杂多了。先了解一个基本的单元测试工具——StepVerifier。当测试关注点是每个数据元素的时候,就与StepVerifier的使用场景非常贴切。例如期望的数据或信号是什么,是否使用Flux发出某个特殊值,接下来100ms做什么,这些场景都可以使用StepVerifier API表示。
下面分别使用StepVerifier测试Flux和Mono,测试代码如下:
/** * @Author zhouguanya * @Date 2018/10/25 * @Description StepVerifier测试案例 */ public class StepVerifierDemo { public static void main(String[] args) { Flux flux = Flux.just(1, 2, 3, 4, 5, 6); // 使用StepVerifier测试Flux,应该正常 StepVerifier.create(flux) //测试下一个期望的数据元素 .expectNext(1, 2, 3, 4, 5, 6) //测试下一个元素是否为完成信号 .expectComplete() .verify(); Mono mono = Mono.error(new Exception("some error")); // 使用StepVerifier测试Mono,应该会出现异常 StepVerifier.create(mono) //测试下一个元素是否为完成信号 .expectComplete() .verify(); } }
运行测试代码,测试结果如下:
6.3.3 操作符(Operator)
本节介绍Reactor一些常用的操作符。
1. map
map可以将数据元素转换成映射表,得到一个新的元素。map操作符示意图如图6-3所示。
图中上方的箭头是原始序列的时间轴,下方的箭头是经过map处理后的数据序列时间轴。map接受一个Function函数式接口,该接口用于定义转换操作的策略:
public final <V> Flux<V> map(Function<? super T,? extends V> mapper) public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)
图6-3 map操作示意图
下面使用案例阐述map操作符的用法。案例代码如下:
执行案例代码发现控制台无异常输出。如果修改立方后的数据为expectNext(10, 8, 27, 64, 125,216)将会出现如下异常:
2. flatMap
flatMap操作可以将每个数据元素转换/映射为各个流,然后将每个流合并为一个大的数据流。flatMap操作符示意图如图6-4所示。
图6-4 flatMap操作示意图
flatMap接收一个Function函数式接口为参数,这个函数式的输入为一个T类型数据值,输出可以是Flux或Mono:
下面使用案例阐述flatMap操作符的用法。案例代码如下:
执行案例代码,得到类似如下结果:
fmlounxo
多次执行案例代码,会得到不同的输出结果。由此可以看出,流的合并是异步的,先来先到,并非是严格按照原始序列的顺序。
3. filter
filter操作可以对数据元素过滤,得到剩余的元素。filter操作符示意图如图6-5所示。
图6-5 filter操作示意图
filter接受一个Predicate的函数式接口为参数,这个函数式接口的作用是进行判断并返回boolean值:
public final Flux<T> filter(Predicate<? super T> tester) public final Mono<T> filter(Predicate<? super T> tester)
下面使用案例阐述filter操作符的用法。案例代码如下:
执行案例代码发现控制台无异常输出。如果修改立方后的数据为expectNext(1, 127, 125)将会出现如下异常:
4. zip
zip能够将多个流一对一的合并起来。zip有多个方法变体,这里只介绍一个最常见的二合一的场景。zip操作符示意图如图6-6所示。
图6-6 zip操作示意图
zip可以从两个Flux/Mono流中,每次各取一个元素,组成一个二元组:
下面使用案例阐述zip操作符的用法。案例代码如下:
执行案例代码,得到如下结果:
[I,0][am,1][Reactor,2]
5. 更多
(1)除了以上几个常见的操作符意外,Reactor中提供了非常丰富的操作符。
(2)用于编程方式自定义生成数据流的create和generate等及其变体方法。
(3)用于“无副作用的peek”场景的doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等及其变体方法。
(4)用于数据流转换的when、and/or、merge、concat、collect、count、repeat等及其变体方法。
(5)用于过滤/拣选的take、first、last、sample、skip、limitRequest等及其变体方法。
(6)用于错误处理的timeout、onErrorReturn、onErrorResume、doFinally、retryWhen等及其变体方法。
(7)用于分批的window、buffer、group等及其变体方法。
(8)用于线程调度的publishOn和subscribeOn方法。
更多操作请见官方文档:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
6.3.4 线程模型
JDK提供的多线程工具类Executors提供了多种线程池,使开发人员可以方便地定义线程池进行多线程开发。Reactor使多线程编程更加容易,Schedulers类提供的静态方法可以更快创建以下几种多线程环境。
• 获取当前线程环境Schedulers.immediate()。
• 获取可重用的单线程环境Schedulers.single()。
• 获取弹性线程池环境Schedulers.elastic()。
• 获取固定大小线程池环境Schedulers.parallel()。
• 获取自定义线程池环境Schedulers.fromExecutorService(ExecutorService) 。
下面通过案例对比单线程同步阻塞和使用Schedulers异步非阻塞的场景。案例中分别有两个方法,hello()方法同步阻塞2s后,返回字符串“Hello, Reactor!”,helloAsync()方法使用Schedulers改进为异步非阻塞方式。案例代码如下:
执行案例代码,得到如下执行结果:
Hello, Reactor! --------同步阻塞场景执行结束-------- -------异步非阻塞场景执行结束------- Hello, Reactor!
观察执行结果,可以发现hello()方法是同步阻塞输出“Hello, Reactor!”,helloAsync()方法是异步非阻塞输出“Hello, Reactor!”的。