有几种方法可以被视为方便的异常抛出方法:
使用处理元素 Flux/Mono.handle
可以简化可能导致错误或空流的元素处理的一种方法是operator
handle。
以下代码显示了如何使用它来解决问题:
Mono.just(userId) .map(repo::findById) .handle((user, sink) -> { if(!isValid(user)){ sink.error(new InvalidUserException()); } else if (isSendable(user)) sink.next(user); } else { //just ignore element } })如我们所见,
.handle操作员需要通过
BiConsumer<T,SynchronousSink<>才能处理元素。在这里,我们的BiConsumer中有两个参数。第一个元素是上游元素,第二个元素是上游元素,
SynchronousSink它帮助我们向下游同步提供元素。这样的技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向该元素提供错误,
SycnchronousSync这将取消上游
onError并向下游产生信号。反过来,我们可以使用相同的
handle运算符来“过滤”
。一旦手柄
BiConsumer被执行并且没有提供任何元素,Reactor会将其视为一种过滤,并将为我们请求其他元素。最后,在元素有效的情况下,我们可以简单地在
SynchronousSink#next下游调用和传播我们的元素,或者在其上应用一些映射,因此这里将
handle作为
map操作符。此外,我们可以安全地使用该运算符,而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。
使用#concatMap
+ 抛出Mono.error
在映射期间引发异常的选项之一是替换
map为
concatMap。从本质上讲,
concatMap它的作用几乎相同
flatMap。唯一的区别是
concatMap一次仅允许一个子流。这种行为大大简化了内部实现,并且不影响性能。因此,我们可以使用以下代码来以更实用的方式引发异常:
Mono.just(userId) .map(repo::findById) .concatMap(user-> { if(!isValid(user)){ return Mono.error(new InvalidUserException()); } return Mono.just(user); })在上述示例中,如果用户无效,我们将使用返回异常
Mono.error。我们可以使用
Flux.error以下方法对通量执行相同的操作:
Flux.just(userId1, userId2, userId3) .map(repo::findById) .concatMap(user-> { if(!isValid(user)){ return Flux.error(new InvalidUserException()); } return Mono.just(user); })注意 ,在两种情况下,我们都返回仅包含一个元素的 冷流 。在Reactor中,在返回的流是冷 标量
流的情况下,有一些优化可以提高性能。因此,建议使用流量/单声道
concatMap+
.just,
empty,
error其结果是,当我们需要更复杂的映射,这可能与最终
return null或
throw new ...。
注意! 永远不要检查传入元素的可空性。Reactor项目永远不会
null为您发送值,因为这违反了Reactive
Streams规范(请参见规则2.13)。因此,如果repo.findById返回null,Reactor将为您抛出NullPointerException。
等等,为什么concatMap
比这更好flatMap
?
从本质上讲,
flatMap它旨在合并一次执行的多个子流中的元素。这意味着flatMap在其下应具有异步流,因此它们可能在多个线程上处理数据,也可能是多个网络调用。随后,这种期望对实现产生了很大的影响,因此
flatMap应该能够处理来自多个流的数据
Thread(意味着并发数据结构的使用),如果另一个流耗尽了,则使元素排队(意味着
Queue为每个s
分配额外的内存子流),并且不违反反应式流规范规则(意味着确实非常复杂的实现)。计算所有这些事实以及我们取代平原的事实
map使用
Flux/Mono.error(不改变执行的同步性)引发异常的更方便的方式(同步操作)导致我们不需要这样一个复杂的运算符,并且可以使用更简单
concatMap的异步处理对象一次只能处理一个流,并进行了一些优化以处理标量冷流。
使用抛出异常 switchOnEmpty
因此,在结果为空时引发异常的另一种方法是
switchOnEmptyoperator。以下代码演示了如何使用该方法:
Mono.just(userId) .flatMap(repo::findById) .switchIfEmpty(Mono.error(new UserNotFoundExeception()))
如我们所见,在这种情况下,
repo::findById应该将
Monoof
User作为返回类型。因此,在
User找不到实例的情况下,结果流将为空。因此,Reactor将调用
Mono指定为
switchIfEmptyparameter
的Alternative 。
照原样抛出您的异常
可以将其视为可读性差的代码或不良做法( 我个人认为 ),但是您可以像使用Project
Reactor一样引发异常。即使这样做在某种程度上可能会违反Reactive Streams规范( 在这种情况下 从语义的角度来看是 违反
的,因为在幕后的运算符是s
Subscriber链中
Subscriber的a,因此-
从语义上讲,在lambda中引发异常可以映射为引发
onNext违反规范规则2.13的方法中的异常)。但是,由于Reactor会为您捕获引发的异常,然后将其作为
onError信号传播到下游,因此不禁止这样做。
外卖
- 使用
.handle
运算符以提供复杂的元素处理 - 当我们需要在映射过程中引发异常时,请使用
concatMap
+Mono.error
,但是这种技术最适合异步元素处理的情况。 - 当我们已经到位时使用
flatMap
+Mono.error``flatMap
Null
因为返回类型是禁止的,所以您将null
在下游而不是下游使用map``onError``NullPointerException
- 使用
switchIfEmpty
在所有情况下,当你需要发送一个错误信号,如果在调用一些特定的函数的结果与完成 空 流



