- 依赖
- 五种被观察者
- 订阅者
- 创建操作符
- 转换操作符
- 组合操作符
- 功能操作符
- 过滤操作符
- 条件操作符
- 自定义Observer
- 对上下游所处线程进行封装
1.依赖
implementation 'io.reactivex.rxjava2:rxjava:2.2.21' implementation 'io.reactivex.rxjava2:rxandroid:2.0.2' implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'//实现防抖功能
2.五种被观察者
(1)Observer
Observer observer = new Observer
3.订阅者
4.创建操作符
(1) create()
(2)just()
简化Create操作,最多只能传入10个参数
(3)fromArray()
简化Create操作 可能传入无限个参数
(4)fromFuture()
与线程安全有关
(5)fromIterable()
可以传入继承了Iterable接口的集合
(6)range()
可以方便确定范围
5.转换操作符
(1)map()
可以将被观察者发送的数据类型转变为其他类型
(2)flatMap()
可以将事件序列中的元素进行加工,一一返回新的被观察者(s)
可以处理嵌套for循环、嵌套网络请求的情况
(3)concatMap()
在flatMap的基础上保证转发出来的事件是有序的
(4)buffer()
从需要发射的事件序列中放进缓存区,达一定数量时一起发送
6.组合操作符
(1)concat()
将多个被观察者所需要发送的事件组合在一个被观察者中,一起发送(即调用一个被观察者,多次调用onNext()方法)最多发送4个
(2)concatArray()
在concat()的基础上可以传无限个被观察者
(3)merge()
在concat()的基础上,实现并行操作
7.功能操作符
(1)subscribeOn()
确定上游事件所处线程
(2)observeOn()
确定下游事件所处线程
(3)参数
- Schedulers.newThread() — 总是启用新线程,并在新线程中执行操作;多用于耗时操作
- Schedulers.io() — 通常用于网络、读写文件等IO密集型的操作,行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效
- AndroidSchedulers.mainThread() — Android的主线程;用于更新UI
- Schedulers.computation() — 代表CPU计算密集型的操作
8.过滤操作符
(1)filter()
满足条件true则通过;不满足条件false过滤掉
9.条件操作符
10.自定义Observer
- 继承Observer
- 确定泛型类型
- 重写4个主要方法
- 使用模板设计模式
abstract class APIResponse(val context : Context) : Observer >{ private var isShow = true //次构造函数 -> 主动控制Dialog显示 constructor(context: Context,isShow : Boolean) : this(context){ this.isShow = isShow } //模板设计模式 abstract fun success(data : T ?) abstract fun failure(errorMsg: String ? ) //订阅---最先调用 override fun onSubscribe(d: Disposable) { // 弹出 加载框 if (isShow) { LoadingDialog.show(context) } } //向下游---上游流下了的数据 override fun onNext(t: LoginResponseWrapper ) { if (t.data == null) { // 失败 failure("登录失败了,请检查原因:msg:${t.errorMsg}") } else { // 成功 success(t.data) } } //错误---上游流下了的错误 override fun onError(e: Throwable) { // 取消加载 LoadingDialog.cancel() failure(e.message) } //完成---传递数据完成后 override fun onComplete() { // 取消加载 LoadingDialog.cancel() } }
11.对上下游所处线程进行封装
private staticObservableTransformer rxud() { return new ObservableTransformer () { @Override public ObservableSource apply(Observable upstream) { return upstream.subscribeOn(Schedulers.io()) // 给上面代码分配异步线程 .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程; //hook .map(new Function () { @Override public UD apply(UD ud) throws Exception { Log.d(TAG, "apply: 我监听到你了,居然再执行"); return ud; } }); } }; }



