在MainActivity6
2.讲解就是把多个上流拼接后,再打印
1.startWith将两个被观察者拼接起来,拼接的后面运行。下面运行结果是1000,2000,3000,1,2,3。 被拼接的前面运行
Observable.create(new ObservableOnSubscribe2.concatWith() { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { //第2执行 emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }) //startWith表示在头部拼接,既拼接的会先执行 .startWith(Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { //第1执行 emitter.onNext(1000); emitter.onNext(2000); emitter.onNext(3000); emitter.onComplete(); } })) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: "+integer); } });
拼接的后面运行,这个跟startWith反过来。下面运行结果是1,2,3,1000,2000,3000,
Observable.create(new ObservableOnSubscribe3.concat() { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { //第2执行 emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }) //concatWith拼接的会后执行 .concatWith(Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { //第1执行 emitter.onNext(1000); emitter.onNext(2000); emitter.onNext(3000); emitter.onComplete(); } })) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: "+integer); } });
把几个拼接起来,然后依次执行。下面运行结果是1,2,3,4
Observable.concat(
Observable.just(1),
Observable.just(2),
Observable.just(3),
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
emitter.onNext("4");
emitter.onComplete();
}
})
)
.subscribe(new Consumer() {
@Override
public void accept(Serializable serializable) throws Throwable {
Log.d(TAG, "accept: "+serializable);
}
});
4.merge
拼接被观察者(上游),然后并列运行。并列运行的第一个值随机,可以是第一个被观察者的值,也可以是2,或者3被观察者。
intervalRange的第一个参数是打印的值,第二个是执行几次,第三个是第一个执行的延迟时间,第四个是间隔时间,第五个是第四个的间隔时间的单位
//并发执行,执行的时候,随机执行三组的其中一个
Observable longObservable1 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
Observable longObservable2 = Observable.intervalRange(6, 5, 1, 2, TimeUnit.SECONDS);
Observable longObservable3 = Observable.intervalRange(11, 5, 1, 2, TimeUnit.SECONDS);
Observable.merge(longObservable1,longObservable2,longObservable3)
.subscribe(new Consumer() {
@Override
public void accept(Long aLong) throws Throwable {
Log.d(TAG, "accept: "+aLong);
}
});
运行结果:
对被观察者进行各个值得拼接,如“英语”与77拼接,再加“”等值,就变成“课程英语77”。这里zip的1,2个参数为被观察者Observable。
ObservablestringObservable1 = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { emitter.onNext("英语"); emitter.onNext("数学"); emitter.onNext("语文"); emitter.onNext("物理"); emitter.onComplete(); } }); Observable stringObservable2 = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable { emitter.onNext(77); emitter.onNext(88); emitter.onNext(99); emitter.onComplete(); } }); Observable.zip(stringObservable1, stringObservable2, new BiFunction () {//最后一个传入的参数StringBuffer为下游输出值 @Override public StringBuffer apply(String s, Integer integer) throws Throwable { return new StringBuffer().append("课程"+s).append("==").append(integer); } }).subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe: 准备进入考试,考试了……"); } @Override public void onNext(@NonNull StringBuffer stringBuffer) { Log.d(TAG, "onNext: 考试结果输出:"+stringBuffer.toString()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete: 考试结束"); } });
运行结果:



