RxJava在异步/多线程方面经常被误解。多线程操作的编码很简单,但是了解抽象是另一回事。
关于RxJava的一个常见问题是如何实现并行化,或如何从Observable同时发出多个项目。当然,此定义违反了Observable
Contract,该协议规定onNext()必须被顺序调用,并且一次不能由多个线程同时调用。
要实现并行性,您需要多个Observable。
这在一个线程中运行:
Observable<Integer> vals = Observable.range(1,10);vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i))).subscribe(val -> System.out.println(val));
代码和文本来自此博客文章。



