栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Android中的RxJava

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Android中的RxJava

最近准备梳理一下Kotlin,先复习一遍RxJava思想,做个学习笔记+伪代码,整个脉络分为三个部分。

(一)使用场景

RxJava是重量级、最复杂的框架(没有之一),JakeWharton 的巅峰之作,操作符非常丰富、特别庞大,学关键的内容,学思维方式,看PPT资料,学两遍。

为什么要学习RxJava?

改变思维(Rx思维)来提升效率,响应式编程/异步事件流编程

Rx思维:起点(分发事件)—>…—>终点(消费事件),中间不会断掉且可以做拦截,链条式思维

学习资料
  • 源码:JakeWharton/Star45.3/v3.x

  • 思维扩展:Rx系列

【以下五部分难度逐步提升!!!】

一、核心思想(基础⭐)

传统思维:不同项目(程序员)实现有不同的思路,封装、Thread、线程池……

dialog—>Thread/AsyncTask/…—>Handler—>UI

Rx思维/卡片式编程/观察者设计模式:
起点/被观察者/Observable—>订阅—>终点/观察者/Observer

封装线程调度,方便多处使用线程切换:

// 封装线程调度,UD:upstram/downloadstream
private final static  ObservableTransformer rxud {
	return new ObservableTransformer(){
		@override
		public ObservableSource apply(Observable upstream){
			// 方法返回自己this,链式调用无限扩展,可以继续作死的调用
			return upstream.subscribeOn(Schedulers.io())//给上层分配异步线程
				.observerOn(AndroidSchedulers.mainThread())// 给下层分配主线程
				.map(new Function(){
					Log.d(TAG, "balabala...");
					return null;
				});
		}
	}
}

事件触发:

public void reJavaDownloadImageAction(View view){
	Observable.just(PATH)// ②起点
		.map(new Function(){// ③卡片式拦截
			...// 请求服务器下载图片操作
			return bitmap;
		})
		.map(new Function(){// +需求:水印
			...// 加水印操作
			return newBitmap;
		})
		.map(new Function(){// +需求:日志
			Log.d(TAG, "balabala...");// Log
			return newBitmap;
		})
		// 线程调度
		//.subscribeOn(Schedulers.io())
		//.observerOn(AndroidSchedulers.mainThread())
		.compose(rxud())// 可以抽取封装起来
		.subscribe(// 订阅,上层区域和下层区域
			new Observer() {// 终点
				onSubScribe(Disposable d)// ①订阅开始,预备操作
				onNext(Bitmap bitmap){// ④拿到事件,和起点类型一致
					image.setImageBitmap(bitmap);
				}
				onError(Throwable e)// 错误事件
				onComplete()// ⑤完成事件
			}
		);
}
二、RxJava配合Retrofit(常用⭐⭐)

常用的网络模型框架开发组合套装:
Retrofit(通过OkHttp请求网络)—>RxJava(仅处理返回数据)—>UI
注:Retrofit不是网络框架,是个强大的封装框架,负责管理

网络请求接口

interface WanAndroidApi{
	// 异步线程,耗时操作

	// 总数据
	@GET("project/tree/json")
	Observable getProject();

	// Item数据
	//@GET("project/list/1/json?cid=294")
	@GET("project/list/{pageIndex}/json")// 使用注解动态传参
	Observable getProject(@Path("pageIndex")int pageIndex, @Query("cid")int cid);
}

具体工具类封装:

HttpUtil {
	public static String base_URL="https://www.wanandroid.com/";
	
	public static void setbaseUrl(String baseUrl) {
		base_URL = baseUrl;
	}

	// 根据各种配置创建出Retrofit
	public static Retrofit getOnlinecookieRetrofit() {
		// OkHttp客户端
		OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
		HttpLoggingInterceptor logInterceptor = new HttpLoggingInterceptor(new HttpLogger());
		logInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
		OkHttpClient okHttpClient = httpBuilder
				.addInterceptor(logInterceptor)
				.addNetworkInterceptor(new StethoInterceptor())
				.readTimeout(1000, TimeUnit.SECONDS)
				.connectTimeout(1000, TimeUnit.SECONDS)
				.writeTimeout(1000, TimeUnit.SECONDS)
				.build();

		return new Retrofit.Builder().baseUrl(base_URL)
			// step1:请求用OkHttp
			.client(okHttpClient)
			// step2:响应用RxJava
			.addConverterFactory(GsonConverterFactory.create(new Gson()))// 添加一个json解析工具
			.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
			.build();
	}
}

Activity 中使用:

private WanAndroidApi api;

onCreate() {
	api = HttpUtil.getOnlinecookieRetrofit().create(WanAndroidApi.class);
}

onClick(View view){
	api.getProject()
		.subscribeOn(Schedulers.io)//io专门处理大量的读取数据,异步类型
		.observeOn(AndroidSchedulers.mainThread())
		// 1. 标准使用方式
		
		// 2. 简化全部内置函数
		
		// 3. 使用lambda写法类型自动推导,代码简洁,不推荐,逻辑不是很清晰
		.subscribe(data-> {
			Log.d(TAG, "getProjectAction:" + data);
		});
}

onItemClick(View view){
	// 点击列表Item,两层逻辑嵌套...
}
三、View防抖(大公司/RxBinding⭐⭐⭐)

定义:瞬间连续点击/自动化脚本……

onClick(View view){
	antiShakeAction();
}

// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeAction() {
	Button bt_anti_shake = findViewById();
	RxView.click(bt_anti_shake)
		.throttleFirst(2000, TimeUnit.MILLISECONDS)// 2秒
		.subscribe(new Consumer(){
			@override
			public void accept(Object o) throws Exception {
				api.getProject()
					.compose(DownloadActivity.rxud())
					.subscribe(new Consumer(){
						@override
						public void accept(ProjectBean projectBean) throws Exception{
							for(ProjectBean.DataBean dataBean : projectBean.getData()){
								api.getProjectItem(1, dataBean.getId())
									.compose(DownloadActivity.rxud())
									.subscribe(new Consumer(){
										@override
										public void accept(ProjectItem projectItem) throws Exception{
											Log.d(TAG, "accept:"+projectItem);//UI操作
										};
									});
							}
						};
					});
			};
		});
}
 
四、网络嵌套(实用⭐⭐⭐⭐) 

下一步操作依赖于上一步操作得结果,先查询主数据,再查询具体的数据,如上

使用flatMap操作符解决,自己会分发N多数据

onClick(View view){
	//antiShakeAction();
	antiShakeActionUpdate();
}

// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeActionUpdate() {
	Button bt_anti_shake = findViewById();
	RxView.click(bt_anti_shake)
		.throttleFirst(2000, TimeUnit.MILLISECONDS)
		.observeOn(Schedules.io())// 先给下面切换异步线程
		.flatMap(new Function>(){
			@override
			public ObservableSource apply(Object o)throws Exception {
				return api.getProject();// 返回主数据
			}
		})
		.flatMap(new Function>(){
			@override
			public ObservableSource apply(ProjectBean projectBean)throws Exception {
				// 自己搞一个发射器,发送多次,等价于上步的for循环
				return Observable.formIterable(projectBean.getData());
			}
		})
		.flatMap(new Function>(){
			@override
			public ObservableSource apply(ProjectItem projectItem)throws Exception {
				return api.getProjectItem(1, projectItem.getId());
			}
		})
		.observeOn(AndroidSchedulers.mainThread())// 再给下面切换主线程
		.subscribe(new Consumer(){
			@override
			public void accept(ProjectItem projectItem) throws Exception {
				// 显示数据
			}
		});
}
五、doOnNext运用(难度最高⭐⭐⭐⭐⭐)

使用场景:主线程和异步线程之间频繁的线程切换(银行的业务……)

Retrofit + RxJava 实现模拟案例:

  1. show progressDialog
  2. 请求服务器注册操作
  3. 注册完成之后更新注册UI
  4. 马上登录服务器操作
  5. 登录完成之后更新登录UI
LoginRequest {}
RegisterRequest {}
LoginResponse {}
RegisterResponse {}
interface IRequestNetwork {
	public Observable registerAction(@Body RegisterRequest registerRequest);
	public Observable loginAction(@Body LoginRequest loginRequest);
}
// 方式一,单独执行
request(View view) {
	MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.registerAction(new RegisterRequest())
		.compose(DownloadActivity.rxud())
		.subscribe(new Consumer() {
			accept(RegisterResponse registerResponse) {
				// 更新Register UI
			}
		});

	MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.loginAction(new LoginRequest())
		.compose(DownloadActivity.rxud())
		.subscribe(new Consumer() {
			accept(LoginResponse loginResponse) {
				// 更新Login UI
			}
		});
}

// 方式二,优化为流式代码
Disposable disposable;// 为了销毁释放提取为全局,消除代码黄色警告
requestBetter(View view) {
	disposable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.registerAction(new RegisterRequest())// step2
		.subscribeOn(Schedulers.io())
		.observeOn(AndroidSchedulers.mainThread())
		.doOnNext(new Consumer(){
			accept(RegisterResponse registerResponse) {
				// step3
			}
		})
		.observeOn(Schedulers.io())
		// step4
		.flatMap(new Function>(){
			@override
			public ObservableSource apply(LoginResponse loginResponse)throws Exception {
				Obsevable loginResponseObsevable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
					.loginAction(new LoginRequest());
				return loginResponseObsevable;
			}
		})
		.subscribeOn(AndroidSchedulers.mainThread())
		.subscribe(new Observer() {
			onSubscribe(Disposable d) {// step1
				progressDialog.show();
				disposable = d;
			}
			onNext()// step5:更新Login UI
			onError()
			onComplete()// step6:dismiss progressDialog
		});
}

onDestory() {
	if(disposable!=null && !disposable.isDisposed()){
		disposable.dispose();// 规范写法,必须释放
	}
}
思考:说说自己对RxJava核心思想的理解?

有一个起点和一个终点,起点开始流向我们的事件,把事件流向终点,在流向过程中可以添加拦截,拦截时可以对事件进行改变操作,终点只关心它上一个拦截,根据上一个拦截的变化而变化。


(二)模式与原理

源码、流程分析,观察者设计模式,map变换操作符原理

// TODO 看书,关于观察者设计模式的部分、并发编程线程池

被观察者抽象层-−—−┐ 观察者抽象层
Observable ├—引用—→Observer
↑ | ↑
实现 | 实现
| | |
被观察者实现层【容器】 | 观察者实现层
ObservableImpl−-−–┘ ObserverImpl

RxJava的Hook点,对整个项目全局(静态)的RxJava的监听、拦截
RxJavaPlugins.onAssembly(...)

RxJava 1.x 预留onAssembly()方法无操作
↓ 优化
RxJava 2.x 调用setonObservableAssembly()方法赋值,优先执行

RxJava 3.x

Hook机制:钩子程序,逆向

结论:很多操作符都会经过全局onAssembly监听

RxJava的观察者模式和标准的观察者设计模式不同

  1. 创建Observable
  2. 创建Observer
  3. 使用subscribe()订阅
  4. 使用map()(发送单个)/flatMap()(发送多次)
洋葱模型:流程是U型结构封包裹、拆包裹
// step2:Observable创建源码,返回ObservableCreate,ObservableOnSubscribe自定义标签,判空+onAssembly(),并把自定义source丢进ObservableCreate
Observable.create(new ObservableOnSubscribe(){
		subscribe(ObservableEmitter emitter) {
			emitter.onNext("A");
		}
	})
	// step4:得到ObservableMap进入subscribeActual(),层层包裹的洋葱模型,执行上一层的source.subscribe()把包裹传回到ObservableCreate里面,传进发射器Emitter里面。Function抽象块,具体传入的参数是实现块,内部进行类型变换。
	.map(new Function())
	// step1:先看Observer的源码,最简单,interface+泛型
	// step3:订阅过程ObservableCreate.subscribe()方法,传入自定义观察者(终点),健壮性校验,ObservableCreate.subscribeActual()抽象函数
	.subscribe(new Observer(){
		onSubscribe()// subscribe后马上执行
		onNext(String s)// 拿到上一个卡片流下来的数据,类型和起始泛型保持类型一致
		onError()// 执行错误
		onComplete()// 时间结束
	});

标准的观察者模式中,有一个被观察者+N多观察者,被观察者发生改变,所有的观察者随之得到改变事件。

RxJava中多个被观察者observable/map()+一个观察者Observer,订阅后马上触发。严格来讲RxJava叫发布订阅模式,多出一个抽象层做转换,实质上不是观察者模式,达到的效果一样。

RxJava中的装饰模型,U型结构概括不够完整:
Observable.create(new ObservableOnSubscribe(){
		subscribe(ObservableEmitter emitter) {
			emitter.onNext("A");
		}
	})
	.map(new Function())
	.map(new Function())
	// ...
	.subscribe(new Observer(){
		onSubscribe()
		onNext(String s)
		onError()
		onComplete()
	});

由内而外:

  • ObservableCreate
  • ObservableMap
  • ObservableMap
  • Subscribe

上下来回的流程:

  • 装饰模型↓(订阅/触发导火线—>必须调用,否则后续无法执行)
  • 封包裹↑
  • 拆包裹/执行流程onNext()↓
背压使用/策略/原理:

生产的速度>消费的速度,导致的内存泄漏
使用Flowable解决背压。

Single是Observable简化版,有局限性

思考:用自己的理解画出map变换操作符详细思路、流程图。
(三)原理与自定义操作符

线程切换(线程调度)原理、自定义RxView操作符

异步事件流编程:

执行过程中可以随意的切换线程

RxAndroid 在客户端开发中必引入,配合使用
  • RxJava 80%
  • RxAndroid 20% AndroidSchedulers.mainThread()

create()最原始的方式,执行过程是可控的,just()内部封装,不可控。

// step2:ObservableCreate调用subscribeOn()方法触发
.subscribeOn(
	// step1
	// 内部RxJavaPlugins.onIoScheduler(IO)进行Hook,唯一赋值函数为setIoSchedulerHandler()
	// 策略机制,有多种策略……
	Schedulers.io()// 耗时读异步操作
	// Schedulers.newThread()// 开启新线程,频繁的
)
.subscribe()// step3:ObservableSubScribeOn调用subscribe()方法触发

onCreate() {
	// IO传递进去的hook,
	RxJavaPlugins.setIoSchedulerHandler(new Function(){
		apply(Scheduler scheduler) {
			Log.d(TAG, "全局监听scheduler:"+scheduler);
			return scheduler;
		}
	});

	// 初始化IO的hook
	RxJavaPlugins.setInitIoSchedulerHandler(new Function, Scheduler>(){
		apply(Callable schedulerCallable) {
			Log.d(TAG, "全局监听schedulerCallable"+schedulerCallable);
			return schedulerCallable.call();
		}
	});
}

RxJavaPlugins.java

static {
	...
	IO = RxJavaPlugins.initIoScheduler(new IOTask());
	NEW_THREAD = RxJavaPlugins.initIoScheduler(new NewThreadTask());
	...
}
DEFAULT = new IoScheduler();
DEFAULT = new NewThreadScheduler();

结论: 经过了层层包装,最终交给线程池管控。

除了onSubscribe()都是异步线程。

测试终点切回主线程

new Thread() {
	run();
	test();
}.start();

// 子线程做事
test() {
	...
	.observerOn(
		AndroidSchedulers.mainThread()
	)
}

通过Handler切换回主线程,Looper.getMainLooper()能保证100%在主线程

DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

如果当前是主线程就不做切换,节省开销。

自定义操作符RxView:防抖控件

操作符就是函数。

RxView {
	TAG = RxView.class.getSimpleName();

	Observable clicks(View view) {
		return ViewClickObservable(view);
	}
}
 
ViewClickObservable extends Observable {
	private final View view;

	// 事件
	private static final Object EVENT = new Object();
	private static Object EVENT2;

	public ViewClickObservable(View view) {
		this.view = view;
		EVENT2 = view;
	}

	subscribeActual(Observer observer) {
		MyLisener myListener = new MyListener(view, observer);
		observer.onSubscribe(myListener);
		this.view.setOnClickListener(myListener);
	}

	static final class MyListener implements View.OnClickListener, Disposable {
		private final View view;
		private final Observer observer;
		// 原子性 [理解AtomicBoolean](https://www.jianshu.com/p/8a44d4a819bc)
		private final AtomicBoolean isDisposable = new AtomicBoolean();// 原子类型

		public MyListener(View view, Observer observer) {
			this.view = view;
			this.observer = observer;
		}

		onClick(View v){
			if(isDisposed() == false) {
				observer.onNext(EVENT)
			}
		}
		dispose(){// 中断
			if(isDisposable.compareAndSet(false, true)){
				// 主线程
				if(Looper.myLooper() == Looper.getMainLooper()){
					view.setOnClickListener(null);
				} else {// 子线程通过Handler切换
					
					// 不使用Handler,使用Rx中的代码风格
					AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
						run() {
							view.setOnClickListener(null);
						}
					});
				}
			}
		}
		isDisposed(){
			return isDisposable.get();
		}
	}
}
 

在 Activity 中使用:

onCreate() {
	Button button = findViewById();

	RxView.clicks(button)
		.throttleFirst(2000, TimeUnit.SECONDS)
		.subscribe(new Consumer(){
			accept(Object o) {
				Observable.create(new ObservableOnSubscribe(){
					subscribe(ObservableEmiiter e){
						e.onNext("aaaa");
					}
				})
				.subscribe(new Consumer(){
					accept(String s){
						Log.d(TAG, s);
					}
				});
			}
		});
}
 
思考:自己对RxJava线程切换的理解?

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号