附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器
本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor Operators
本节的内容来自我翻译的Reactor 3 参考文档——如何选择操作符。由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅。
如果一个操作符是专属于
Flux或Mono的,那么会给它注明前缀。
公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,
会以一个点(.)开头,并将参数置于圆括号内,比如:.methodCall(parameter)。
1)创建一个新序列,它...
- 发出一个
T,我已经有了:just- ...基于一个
Optional<T>:Mono#justOrEmpty(Optional<T>) - ...基于一个可能为
null的 T:Mono#justOrEmpty(T)
- ...基于一个
- 发出一个
T,且还是由just方法返回- ...但是“懒”创建的:使用
Mono#fromSupplier或用defer包装just
- ...但是“懒”创建的:使用
- 发出许多
T,这些元素我可以明确列举出来:Flux#just(T...) - 基于迭代数据结构:
- 一个数组:
Flux#fromArray - 一个集合或 iterable:
Flux#fromIterable - 一个 Integer 的 range:
Flux#range - 一个
Stream提供给每一个订阅:Flux#fromStream(Supplier<Stream>)
- 一个数组:
- 基于一个参数值给出的源:
- 一个
Supplier<T>:Mono#fromSupplier - 一个任务:
Mono#fromCallable,Mono#fromRunnable - 一个
CompletableFuture<T>:Mono#fromFuture
- 一个
- 直接完成:
empty - 立即生成错误:
error- ...但是“懒”的方式生成
Throwable:error(Supplier<Throwable>)
- ...但是“懒”的方式生成
- 什么都不做:
never - 订阅时才决定:
defer - 依赖一个可回收的资源:
using - 可编程地生成事件(可以使用状态):
- 同步且逐个的:
Flux#generate - 异步(也可同步)的,每次尽可能多发出元素:
Flux#create
(Mono#create也是异步的,只不过只能发一个)
- 同步且逐个的:
2)对序列进行转化
-
我想转化一个序列:
- 1对1地转化(比如字符串转化为它的长度):
map - ...类型转化:
cast - ...为了获得每个元素的序号:
Flux#index - 1对n地转化(如字符串转化为一串字符):
flatMap+ 使用一个工厂方法 - 1对n地转化可自定义转化方法和/或状态:
handle - 对每一个元素执行一个异步操作(如对 url 执行 http 请求):
flatMap+ 一个异步的返回类型为Publisher的方法 - ...忽略一些数据:在 flatMap lambda 中根据条件返回一个
Mono.empty() - ...保留原来的序列顺序:
Flux#flatMapSequential(对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序) - ...当 Mono 元素的异步任务会返回多个元素的序列时:
Mono#flatMapMany
- 1对1地转化(比如字符串转化为它的长度):
-
我想添加一些数据元素到一个现有的序列:
- 在开头添加:
Flux#startWith(T...) - 在最后添加:
Flux#concatWith(T...)
- 在开头添加:
-
我想将
Flux转化为集合(一下都是针对Flux的)- 转化为 List:
collectList,collectSortedList - 转化为 Map:
collectMap,collectMultiMap - 转化为自定义集合:
collect - 计数:
count - reduce 算法(将上个元素的reduce结果与当前元素值作为输入执行reduce方法,如sum)
reduce - ...将每次 reduce 的结果立即发出:
scan - 转化为一个 boolean 值:
- 对所有元素判断都为true:
all - 对至少一个元素判断为true:
any - 判断序列是否有元素(不为空):
hasElements - 判断序列中是否有匹配的元素:
hasElement
- 转化为 List:
-
我想合并 publishers...
- 按序连接:
Flux#concat或.concatWith(other) - ...即使有错误,也会等所有的 publishers 连接完成:
Flux#concatDelayError - ...按订阅顺序连接(这里的合并仍然可以理解成序列的连接):
Flux#mergeSequential - 按元素发出的顺序合并(无论哪个序列的,元素先到先合并):
Flux#merge/.mergeWith(other) - ...元素类型会发生变化:
Flux#zip/Flux#zipWith - 将元素组合:
- 2个 Monos 组成1个
Tuple2:Mono#zipWith - n个 Monos 的元素都发出来后组成一个 Tuple:
Mono#zip - 在终止信号出现时“采取行动”:
- 在 Mono 终止时转换为一个
Mono<Void>:Mono#and - 当 n 个 Mono 都终止时返回
Mono<Void>:Mono#when - 返回一个存放组合数据的类型,对于被合并的多个序列:
- 每个序列都发出一个元素时:
Flux#zip - 任何一个序列发出元素时:
Flux#combineLatest
- 每个序列都发出一个元素时:
- 只取各个序列的第一个元素:
Flux#first,Mono#first,mono.or<br/>(otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux) - 由一个序列触发(类似于
flatMap,不过“喜新厌旧”):switchMap - 由每个新序列开始时触发(也是“喜新厌旧”风格):
switchOnNext
- 按序连接:
-
我想重复一个序列:
repeat- ...但是以一定的间隔重复:
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
- ...但是以一定的间隔重复:
-
我有一个空序列,但是...
- 我想要一个缺省值来代替:
defaultIfEmpty - 我想要一个缺省的序列来代替:
switchIfEmpty
- 我想要一个缺省值来代替:
-
我有一个序列,但是我对序列的元素值不感兴趣:
ignoreElements- ...并且我希望用
Mono来表示序列已经结束:then - ...并且我想在序列结束后等待另一个任务完成:
thenEmpty - ...并且我想在序列结束之后返回一个
Mono:Mono#then(mono) - ...并且我想在序列结束之后返回一个值:
Mono#thenReturn(T) - ...并且我想在序列结束之后返回一个
Flux:thenMany
- ...并且我希望用
-
我有一个 Mono 但我想延迟完成...
- ...当有1个或N个其他 publishers 都发出(或结束)时才完成:
Mono#delayUntilOther - ...使用一个函数式来定义如何获取“其他 publisher”:
Mono#delayUntil(Function)
- ...当有1个或N个其他 publishers 都发出(或结束)时才完成:
- 我想基于一个递归的生成序列的规则扩展每一个元素,然后合并为一个序列发出:
- ...广度优先:
expand(Function) - ...深度优先:
expandDeep(Function)
- ...广度优先:
3)“窥视”(只读)序列
-
再不对序列造成改变的情况下,我想:
- 得到通知或执行一些操作:
- 发出元素:
doOnNext - 序列完成:
Flux#doOnComplete,Mono#doOnSuccess - 因错误终止:
doOnError - 取消:
doOnCancel - 订阅时:
doOnSubscribe - 请求时:
doOnRequest - 完成或错误终止:
doOnTerminate(Mono的方法可能包含有结果)- 但是在终止信号向下游传递 之后 :
doAfterTerminate
- 但是在终止信号向下游传递 之后 :
- 所有类型的信号(
Signal):Flux#doOnEach - 所有结束的情况(完成complete、错误error、取消cancel):
doFinally - 记录日志:
log
- 我想知道所有的事件:
- 每一个事件都体现为一个
single对象: - 执行 callback:
doOnEach - 每个元素转化为
single对象:materialize- ...在转化回元素:
dematerialize
- ...在转化回元素:
- 转化为一行日志:
log
- 每一个事件都体现为一个
4)过滤序列
-
我想过滤一个序列
- 基于给定的判断条件:
filter - ...异步地进行判断:
filterWhen - 仅限于指定类型的对象:
ofType - 忽略所有元素:
ignoreElements - 去重:
- 对于整个序列:
Flux#distinct - 去掉连续重复的元素:
Flux#distinctUntilChanged
- 基于给定的判断条件:
-
我只想要一部分序列:
- 只要 N 个元素:
- 从序列的第一个元素开始算:
Flux#take(long)- ...取一段时间内发出的元素:
Flux#take(Duration) - ...只取第一个元素放到
Mono中返回:Flux#next() - ...使用
request(N)而不是取消:Flux#limitRequest(long)
- ...取一段时间内发出的元素:
- 从序列的最后一个元素倒数:
Flux#takeLast - 直到满足某个条件(包含):
Flux#takeUntil(基于判断条件),Flux#takeUntilOther(基于对 publisher 的比较) - 直到满足某个条件(不包含):
Flux#takeWhile - 最多只取 1 个元素:
- 给定序号:
Flux#elementAt - 最后一个:
.takeLast(1)- ...如果为序列空则发出错误信号:
Flux#last() - ...如果序列为空则返回默认值:
Flux#last(T)
- ...如果为序列空则发出错误信号:
- 跳过一些元素:
- 从序列的第一个元素开始跳过:
Flux#skip(long)- ...跳过一段时间内发出的元素:
Flux#skip(Duration)
- ...跳过一段时间内发出的元素:
- 跳过最后的 n 个元素:
Flux#skipLast - 直到满足某个条件(包含):
Flux#skipUntil(基于判断条件),Flux#skipUntilOther(基于对 publisher 的比较) - 直到满足某个条件(不包含):
Flux#skipWhile - 采样:
- 给定采样周期:
Flux#sample(Duration)- 取采样周期里的第一个元素而不是最后一个:
sampleFirst
- 取采样周期里的第一个元素而不是最后一个:
- 基于另一个 publisher:
Flux#sample(Publisher) - 基于 publisher“超时”:
Flux#sampleTimeout(每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)
- 我只想要一个元素(如果多于一个就返回错误)...
- 如果序列为空,发出错误信号:
Flux#single() - 如果序列为空,发出一个缺省值:
Flux#single(T) - 如果序列为空就返回一个空序列:
Flux#singleOrEmpty
- 如果序列为空,发出错误信号:
5)错误处理
-
我想创建一个错误序列:
error...- ...替换一个完成的
Flux:.concat(Flux.error(e)) - ...替换一个完成的
Mono:.then(Mono.error(e)) - ...如果元素超时未发出:
timeout - ...“懒”创建:
error(Supplier<Throwable>)
- ...替换一个完成的
-
我想要类似 try/catch 的表达方式:
- 抛出异常:
error - 捕获异常:
- 然后返回缺省值:
onErrorReturn - 然后返回一个
Flux或Mono:onErrorResume - 包装异常后再抛出:
.onErrorMap(t -> new RuntimeException(t)) - finally 代码块:
doFinally - Java 7 之后的 try-with-resources 写法:
using工厂方法
- 抛出异常:
-
我想从错误中恢复...
- 返回一个缺省的:
- 的值:
onErrorReturn Publisher:Flux#onErrorResume和Mono#onErrorResume- 重试:
retry - ...由一个用于伴随 Flux 触发:
retryWhen
- 我想处理回压错误(向上游发出“MAX”的 request,如果下游的 request 比较少,则应用策略)...
- 抛出
IllegalStateException:Flux#onBackpressureError - 丢弃策略:
Flux#onBackpressureDrop - ...但是不丢弃最后一个元素:
Flux#onBackpressureLatest - 缓存策略(有限或无限):
Flux#onBackpressureBuffer - ...当有限的缓存空间用满则应用给定策略:
Flux#onBackpressureBuffer带有策略BufferOverflowStrategy
- 抛出
6) 基于时间的操作
-
我想将元素转换为带有时间信息的
Tuple2<Long, T>...- 从订阅时开始:
elapsed - 记录时间戳:
timestamp
- 从订阅时开始:
-
如果元素间延迟过长则中止序列:
timeout -
以固定的周期发出元素:
Flux#interval -
在一个给定的延迟后发出
0:staticMono.delay. - 我想引入延迟:
- 对每一个元素:
Mono#delayElement,Flux#delayElements - 延迟订阅:
delaySubscription
- 对每一个元素:
7)拆分 Flux
-
我想将一个
Flux<T>拆分为一个Flux<Flux<T>>:- 以个数为界:
window(int) - ...会出现重叠或丢弃的情况:
window(int, int) - 以时间为界:
window(Duration) - ...会出现重叠或丢弃的情况:
window(Duration, Duration) - 以个数或时间为界:
windowTimeout(int, Duration) - 基于对元素的判断条件:
windowUntil - ...触发判断条件的元素会分到下一波(
cutBefore变量):.windowUntil(predicate, true) - ...满足条件的元素在一波,直到不满足条件的元素发出开始下一波:
windowWhile(不满足条件的元素会被丢弃) - 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:
window(Publisher),windowWhen
- 以个数为界:
-
我想将一个
Flux<T>的元素拆分到集合...- 拆分为一个一个的
List: - 以个数为界:
buffer(int)- ...会出现重叠或丢弃的情况:
buffer(int, int)
- ...会出现重叠或丢弃的情况:
- 以时间为界:
buffer(Duration)- ...会出现重叠或丢弃的情况:
buffer(Duration, Duration)
- ...会出现重叠或丢弃的情况:
- 以个数或时间为界:
bufferTimeout(int, Duration) - 基于对元素的判断条件:
bufferUntil(Predicate)- ...触发判断条件的元素会分到下一个buffer:
.bufferUntil(predicate, true) - ...满足条件的元素在一个buffer,直到不满足条件的元素发出开始下一buffer:
bufferWhile(Predicate)
- ...触发判断条件的元素会分到下一个buffer:
- 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:
buffer(Publisher),bufferWhen - 拆分到指定类型的 "collection":
buffer(int, Supplier<C>)
- 拆分为一个一个的
- 我想将
Flux<T>中具有共同特征的元素分组到子 Flux:groupBy(Function<T,K>)(注意返回值是Flux<GroupedFlux<K, T>>,每一个GroupedFlux具有相同的 key 值K,可以通过key()方法获取)。
8)回到同步的世界
-
我有一个
Flux<T>,我想:- 在拿到第一个元素前阻塞:
Flux#blockFirst - ...并给出超时时限:
Flux#blockFirst(Duration) - 在拿到最后一个元素前阻塞(如果序列为空则返回 null):
Flux#blockLast - ...并给出超时时限:
Flux#blockLast(Duration) - 同步地转换为
Iterable<T>:Flux#toIterable - 同步地转换为 Java 8
Stream<T>:Flux#toStream
- 在拿到第一个元素前阻塞:
- 我有一个
Mono<T>,我想:- 在拿到元素前阻塞:
Mono#block - ...并给出超时时限:
Mono#block(Duration) - 转换为
CompletableFuture<T>:Mono#toFuture
- 在拿到元素前阻塞: