WebClient.exchange()
结果流是单播的
实际上,这里的问题是
WebClient每个连接仅允许一个订户。如果您尝试两次订阅相同的 交换 连接-
您将获得
java.lang.IllegalStateException: only one connection receive subscriberallowed.
尽管我看不到您尝试在哪里重用同一连接两次的事实,但我相信您可以通过使用下一个运算符组合来解决该问题:
class GeoService() { val client = WebClient.create("https://maps.googleapis.com/maps/api/geopre/") fun resolveGeoFromCity(city: String): Mono<Geo> { return client.get() .uri("json?address=$city&key=$API_KEY&language=en") .exchange() .flatMap { it.bodyToMono(String::class.java) } .map { parse(it) } .share(); } ...}在该示例中,流被配置为多播(共享)原始源,只要有至少一个
Subscriber订阅源即可。如果您需要所有订阅者都收到相同的日期,可以
.share用
.cache运营商代替。
此外,还有一种替代上述技术的方法。您可以将上述运算符替换为处理器,并获得相同的共享可能性:
class GeoService() { val client = WebClient.create("https://maps.googleapis.com/maps/api/geopre/") fun resolveGeoFromCity(city: String): Mono<Geo> { return client.get() .uri("json?address=$city&key=$API_KEY&language=en") .exchange() .flatMap { it.bodyToMono(String::class.java) } .map { parse(it) } .subscribeWith(DirectProcessor.create()); } ...}在这种情况下,您正好在调用之后立即订阅和运行对源数据的使用
subscribeWith,因此,在这种情况下,可能会丢失部分数据,依此类推。
为什么Mono.just(..)
一切正常?
首先
.just是冷操作员,它允许尽可能多的用户在任何时间点接收相同的数据。这就是为什么当您尝试两次从连接中使用相同的数据块时,没有得到任何异常的原因。



