栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

六、RxJava合并startWith,concatWith,concat,merge,zip

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

六、RxJava合并startWith,concatWith,concat,merge,zip

1.项目地址

在MainActivity6

2.讲解

就是把多个上流拼接后,再打印

1.startWith

将两个被观察者拼接起来,拼接的后面运行。下面运行结果是1000,2000,3000,1,2,3。 被拼接的前面运行

Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                //第2执行
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })
                //startWith表示在头部拼接,既拼接的会先执行
                .startWith(Observable.create(new ObservableOnSubscribe() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                        //第1执行
                        emitter.onNext(1000);
                        emitter.onNext(2000);
                        emitter.onNext(3000);
                        emitter.onComplete();
                    }
                }))
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: "+integer);
                    }
                });
2.concatWith

拼接的后面运行,这个跟startWith反过来。下面运行结果是1,2,3,1000,2000,3000,

Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                //第2执行
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })
                //concatWith拼接的会后执行
                .concatWith(Observable.create(new ObservableOnSubscribe() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                        //第1执行
                        emitter.onNext(1000);
                        emitter.onNext(2000);
                        emitter.onNext(3000);
                        emitter.onComplete();
                    }
                }))
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: "+integer);
                    }
                });
3.concat

把几个拼接起来,然后依次执行。下面运行结果是1,2,3,4

Observable.concat(
                Observable.just(1),
                Observable.just(2),
                Observable.just(3),
                Observable.create(new ObservableOnSubscribe() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                        emitter.onNext("4");
                        emitter.onComplete();
                    }
                })
        )
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Serializable serializable) throws Throwable {
                        Log.d(TAG, "accept: "+serializable);
                    }
                });
4.merge

拼接被观察者(上游),然后并列运行。并列运行的第一个值随机,可以是第一个被观察者的值,也可以是2,或者3被观察者。
intervalRange的第一个参数是打印的值,第二个是执行几次,第三个是第一个执行的延迟时间,第四个是间隔时间,第五个是第四个的间隔时间的单位

//并发执行,执行的时候,随机执行三组的其中一个
        Observable longObservable1 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
        Observable longObservable2 = Observable.intervalRange(6, 5, 1, 2, TimeUnit.SECONDS);
        Observable longObservable3 = Observable.intervalRange(11, 5, 1, 2, TimeUnit.SECONDS);
        Observable.merge(longObservable1,longObservable2,longObservable3)
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.d(TAG, "accept: "+aLong);
                    }
                });

运行结果:

5.zip

对被观察者进行各个值得拼接,如“英语”与77拼接,再加“”等值,就变成“课程英语77”。这里zip的1,2个参数为被观察者Observable。

Observable stringObservable1 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                emitter.onNext("英语");
                emitter.onNext("数学");
                emitter.onNext("语文");
                emitter.onNext("物理");
                emitter.onComplete();
            }
        });
        Observable stringObservable2 = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
                emitter.onNext(77);
                emitter.onNext(88);
                emitter.onNext(99);
                emitter.onComplete();
            }
        });

        Observable.zip(stringObservable1, stringObservable2, new BiFunction() {//最后一个传入的参数StringBuffer为下游输出值
            @Override
            public StringBuffer apply(String s, Integer integer) throws Throwable {
                return new StringBuffer().append("课程"+s).append("==").append(integer);
            }
        }).subscribe(new Observer() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: 准备进入考试,考试了……");
            }

            @Override
            public void onNext(@NonNull StringBuffer stringBuffer) {
                Log.d(TAG, "onNext: 考试结果输出:"+stringBuffer.toString());
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 考试结束");
            }
        });

运行结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600992.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号