现在有一个需求,下载一张照片,用户点击下载,弹出正在下载提示框,下载结束显示图片,关闭提示框
传统思维1.封装thread
2.编写网络请求代码
3.拿到数据创建bitmap
4.handler回调,更新ui
public void downloadImageAction(View view) {
progressDialog = new ProgressDialog(this);
progressDialog.setTitle("下载图片中...");
progressDialog.show();
new Thread(new Runnable() {
@Override
public void run() {
try {
URL url = new URL(PATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
Message message = handler.obtainMessage();
message.obj = bitmap;
handler.sendMessage(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private final Handler handler = new Handler(new Handler.Callback() {
@Override
public boolean handleMessage(@NonNull Message msg) {
Bitmap bitmap = (Bitmap) msg.obj;
image.setImageBitmap(bitmap);
if (progressDialog != null) progressDialog.dismiss();
return false;
}
});
Rx思想
观察者设计模式,起点是被观察者,终点是观察者,一条流水线的思维,响应式编程
对于下载图片这个功能,用rx思想其实也是上面4步,只是他从头到尾 是一条线,rx框架把这四个操作封装成了一行代码,他提供了切换线程,转换数据的方法,看起来是一体的,不会断,不像传统思维一样散
public void rxJavaDownloadImageAction(View view) {
// 起点
Observable.just(PATH) // 内部会分发 PATH Stirng // TODO 第二步
// TODO 第三步
.map(new Function() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(PATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
//切换线程 之上的异步线程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//之下的主线程
// 订阅 起点 和 终点 订阅起来
.subscribe(
// 终点
new Observer() {
// 订阅开始
@Override
public void onSubscribe(Disposable d) {
// 预备 开始 要分发
// TODO 第一步
progressDialog = new ProgressDialog(DownloadActivity.this);
progressDialog.setTitle("download run");
progressDialog.show();
}
// TODO 第四步
// 拿到事件
@Override
public void onNext(Bitmap bitmap) {
image.setImageBitmap(bitmap);
}
// 错误事件
@Override
public void onError(Throwable e) {
}
// TODO 第五步
// 完成事件
@Override
public void onComplete() {
if (progressDialog != null)
progressDialog.dismiss();
}
});
}
如果现在想给图片加水印的话,这个过程也不会断,只需要在map后边再加一个map
.map(new Function() { @Override public Bitmap apply(Bitmap bitmap) throws Exception { Paint paint = new Paint(); paint.setTextSize(88); paint.setColor(Color.RED); return drawTextToBitmap(bitmap, "同学们大家好",paint, 88 , 88); } })
这就是rx的思想,从字符串到照片,再到带水印的照片他是一条线,不想要就可以直接删掉,方便拆卸
上边代码还可以改进,封装线程切换的操作
ObservableTransformer是RxJava提供的转换器,可以将一系列操作进行封装
public final staticObservableTransformer rxud() { return new ObservableTransformer () { @Override public ObservableSource apply(Observable upstream) { return upstream.subscribeOn(Schedulers.io()) // 给上面代码分配异步线程 .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程; .map(new Function () { @Override public UD apply(UD ud) throws Exception { Log.d(TAG, "apply: 我监听到你了,居然再执行"); return ud; } }); } }; } //.subscribeOn(Schedulers.io()) //.observeOn(AndroidSchedulers.mainThread()) //上面两行代码可以换成 //.compose(rxud())
注意事项:Rx使用过程中要记得销毁,否则内存泄漏
@Override
protected void onDestroy() {
super.onDestroy();
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
实战
嵌套查询,功能防抖
flatMap和doOnNext的使用
demo链接:https://pan.baidu.com/s/1cUV6Dg9XYaJNRoCHATRXFQ?pwd=fymf
提取码:ucpe
就比如公众号,如果说作者发布了一条消息,关注他的都可以收到消息,对应的我们用户就是观察者,而公众号就是被观察者,也就是说我们观察者要到被观察者进行注册(容器存储),发生改变以后,通知观察者做出响应
特征:一个被观察者多个观察者
Rx观察者模式就比如下载图片,加水印,并展示的需求,对于Rx来说终点只有一个,也就是终点观察上方的活动,每一个节点都是与一个被观察者
特征:多个被观察者一个观察者
源码分析我们先看一段简单的代码
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("zjm"); } }) .subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { Log.e("SimpleRx", "我们开始喽"); } @Override public void onNext(@NonNull String s) { Log.e("SimpleRx", "要结束了喽"); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { Log.e("SimpleRx", "真正结束了"); } }); //打印结果 //2022-02-19 11:31:05.459 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽 //2022-02-19 11:31:05.459 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm //2022-02-19 11:31:05.460 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = zjm //onComplete未执行?后边解释
其实就是两个方法create和subscribe
跟着结果去查看源码
io.reactivex.Observable#create
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public staticObservable create(ObservableOnSubscribe source) { //异常判断 ObjectHelper.requireNonNull(source, "source is null"); //现在抛开主流程,我们着重分析一下RxJavaPlugins.onAssembly return RxJavaPlugins.onAssembly(new ObservableCreate (source)); }
先说结论,这是Rx全局hook的地方,所有的操作符都会先走hook
Hookio.reactivex.plugins.RxJavaPlugins#onAssembly
@NonNull public staticObservable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; //如果我们不设置onObservableAssembly,则不会有变动 if (f != null) { return apply(f, source); } return source; }
我们如果给上面的代码加上下面代码
RxJavaPlugins.setOnObservableAssembly(new Function() { @Override public Observable apply(@NonNull Observable observable) throws Exception { Log.e("SimpleRx hook", observable+""); return observable; } }); //执行结果 //2022-02-19 11:40:34.875 15400-15400/com.hbsf.myrxjavademo E/SimpleRx hook: io.reactivex.internal.operators.observable.ObservableCreate@f946fb1 //2022-02-19 11:40:34.877 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽 //2022-02-19 11:40:34.877 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm //2022-02-19 11:40:34.878 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = zjm
hook,不会只拦截一次,只是我们的代码让他只拦截一次,如果加map,他也会拦截,中间的节点都会被hook
creat回到主线io.reactivex.Observable#create,查看RxJavaPlugins.onAssembly(new ObservableCreate(source))中的new
io.reactivex.internal.operators.observable.ObservableCreate#ObservableCreate()
//啥也没干,进行赋值,现在我们要记住,source是我们传进去的,这个source是一个ObservableOnSubscribe,并实现了subscribe方法 public ObservableCreate(ObservableOnSubscribesubscribesource) { this.source = source; } //我们最开始传入的source new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Log.e("SimpleRx", "数据开始流动了 data = zjm"); e.onNext("zjm"); } };
走完creat走subscribe,creat返回了一个ObservableCreate对象继承自Observable
io.reactivex.Observable#subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//hook点,只是专门hook Subscrib方法
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空处理
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//真正的执行操作,重点
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
抽象方法,我们知道之前creat,其实真正返回的是一个ObservableCreate对象
io.reactivex.Observable#subscribeActual
protected abstract void subscribeActual(Observer super T> observer);
我们到ObservableCreate去看
io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual
//要清楚传过来的参数是什么,是我们的观察者,也就是控制开始和结束的那个对象
protected void subscribeActual(Observer super T> observer) {
//构造发射器
CreateEmitter parent = new CreateEmitter(observer);
//执行观察者开始方法1
observer.onSubscribe(parent);
try {
//回调被观察者
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#CreateEmitter()
CreateEmitter(Observer super T> observer) {
this.observer = observer;
}
这里说明为啥要发射器,为了降低观察者和被观察者之前的耦合,我们之前让被观察者直接用list依赖观察者,耦合度较高
执行完observer.onSubscribe(parent);,此时也就打印了 ”我们开始喽“
然后走被观察者的回调,此时打印**“数据开始流动了 data = zjm”**
@Override public void subscribe(@NonNull ObservableEmittere) throws Exception { Log.e("SimpleRx", "数据开始流动了 data = zjm"); e.onNext("zjm"); }
走发射器的onNext
io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext()
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//把数据传到下层,此时这个observer就是之前我们保存的终点
if (!isDisposed()) {
observer.onNext(t);
}
}
回到终点
@Override
public void onNext(@NonNull String s) {
Log.e("SimpleRx", "要结束了喽 data = " + s);
}
此时流程结束了,此时我们发现并没有onComplete的事,所以也就没有执行
我们再理解Rx的观察者,其实对于下层来说,下层就是上层的观察者,上层就是下层的被观察者,我们之前说多个被观察者一个观察者是对于终点来说的,其实他们是相对而言的,这也是和普通观察者模式最大的区别。
map修改代码
//经上面分析我们知道creat返回ObservableCreate,传入的参数我们叫起始节点 Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Log.e("SimpleRx", "数据开始流动了 data = zjm"); e.onNext("zjm"); } }) //ObservableCreate.map 下面进行分析(先往下看),传入的参数叫转换节点 .map(new Function () { @Override public String apply(@NonNull String s) throws Exception { Log.e("SimpleRx", "我改值了哦 data = ljn"); return "ljn"; } }) //ObservableMap.subscribe,传入的参数叫终点 .subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { Log.e("SimpleRx", "我们开始喽"); } @Override public void onNext(@NonNull String s) { Log.e("SimpleRx", "要结束了喽 data = " + s); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { Log.e("SimpleRx", "真正结束了"); } }); //执行结果 //2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽 //2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm //2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我改值了哦 data = ljn //2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = ljn
io.reactivex.Observable#map
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public finalObservable map(Function super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap (this, mapper)); }
可见返回一个ObservableMap对象 继承自Observable,回到上面
也就是三步
1.Observable.create()上面已分析过,不再分析
2.ObservableCreate.map()
//io.reactivex.Observable#map @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public finalObservable map(Function super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); //this是指ObservableCreate,ObservableCreate中我们保存了起始节点 return RxJavaPlugins.onAssembly(new ObservableMap (this, mapper)); } //io.reactivex.internal.operators.observable.ObservableMap public ObservableMap(ObservableSource source, Function super T, ? extends U> function) { //保存source super(source); // 方法实现是this.source = source; this.function = function; } //执行完毕后,我们想一下此时map的结构 //this.source 是我们Observable.create()返回的对象,ObservableCreate里的source 是我们最开始传入的起始节点 //this.function 是我们的转换节点
3.ObservableMap.subscribe()
ObservableMap,ObservableCreate之类的类都继承自Observable,subscribe是在父类中实现的,上边已经分析过,subscribe中调用了subscribeActual,subscribeActual是抽象方法给子类去实现的,此时会进入ObservableMap的subscribeActual
io.reactivex.internal.operators.observable.ObservableMap#subscribeActual
@Override
public void subscribeActual(Observer super U> t) {
//记住source是我们ObservableCreate对象
source.subscribe(new MapObserver(t, function));
}
先看new MapObserver
t是终点,function是转换节点,打包成MapObserver
io.reactivex.internal.operators.observable.ObservableMap.MapObserver#MapObserver
MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) {
//保存节点
super(actual);
this.mapper = mapper;
}
再看source.subscribe(new MapObserver
//之前已经分析过,只是参数有区别,治理参数是我们的MapObserver,MapObserver中的类结构
//this.actual 是我们的终点
//this.mapper 是我们的转换节点
@Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);//等价于MapObserver.onSubscribe,MapObserver继承自BasicFuseableObserver,也就是说走的BasicFuseableObserver.onSubscribe
try {
//执行起始节点的方法
//输出 2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
io.reactivex.internal.observers.BasicFuseableObserver#onSubscribe
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable)s;
}
if (beforeDownstream()) {
//actual是我们的终点,此时我们将输出
//2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽
actual.onSubscribe(this);
afterDownstream();
}
}
}
之前分析过source.subscribe(parent);,此时略过,我们进入我们自己写的回调
//起始节点 @Override public void subscribe(@NonNull ObservableEmittere) throws Exception { Log.e("SimpleRx", "数据开始流动了 data = zjm"); e.onNext("zjm"); }
继续往下走e.onNext(“zjm”);,e是我们传过来的CreateEmitter
//io.reactivex.internal.operators.observable.CreateEmitter#onNext
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//这个observer是啥,我们之前分析过,是我们的MapObserver,t是字符串“zjm”
observer.onNext(t);
}
}
//io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//mapper是我们转换节点,此时就会进入我们的apply, mapper.apply(t)就是"ljn"
//输出 2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我改值了哦 data = ljn
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//actual,终点
//输出2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = ljn
actual.onNext(v);
}
到这流程结束
让我们迷糊的就是类的source和observer两个属性,source永远是上层,observer永远是下层
总结,我们不断点执行方法的过程其实就是给下层的source属性不断赋值的过程,当调用subscribe后,会把下层通过source交给上层的observer属性(对于上层来讲下面的节点就是observer),然后我们执行onSubscribe,此时会把我们的observer一层层再往下回调(对于上述例子的操作符是这么运行的,别的操作符会有区别),然后再执行我们自己的数据操作
点的过程就是往下封包的过程,我们调用subscribe,是往上封包的过程,onSubscribe的调用是往下解包的过程
线程切换的原理 subscribeOn.subscribeOn(Schedulers.io())这句话的原理
Schedulers.io()
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
//策略,IO,
//还有别的 比如static final Scheduler NEW_THREAD;,我们之分析IO
static final Scheduler IO;
//hook最终返回IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
//hook,不用管
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
//多层封装,不太重点
static final class IOTask implements Callable {
//call方法没有直接调用是通过callRequireNonNull来调用的
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
//多层封装,也不太重点
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//多层封装,也不太重点
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
//多层封装,也不太重点
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference(NONE);
start();
}
//多层封装,也不太重点
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//重点,这我们就找到了线程池的初始化
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentlinkedQueue();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future> task = null;
if (unit != null) {
//创建线程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
到这我们只要记得Schedulers.io()本质就是一个线程池
subscribeOn返回ObservableSubscribeOn
public final ObservablesubscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn (this, scheduler)); }
此时执行subscribe,也就相当于ObservableSubscribeOn.subscribe最终执行ObservableSubscribeOn.subscribeActual
@Override
public void subscribeActual(final Observer super T> s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
//直接回调重点的onSubscribe,在主线程执行
s.onSubscribe(parent);
//将终点打包,交给线程池
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
看一下怎么交给线程池的
new SubscribeTask(parent)
//只要我们把SubscribeTask交给线程池,执行run以后以后就的操作将全部异步执行
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;
}
@Override
public void run() {
//异步订阅,继续往上走,这是第二次往上封包的过程
source.subscribe(parent);
}
}
scheduler.scheduleDirect(new SubscribeTask(parent))
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//public abstract Worker createWorker();抽象方法,不同的策略实现并不相同,IO的实现类是IoScheduler
final Worker w = createWorker();//new EventLoopWorker(pool.get());
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//真正的执行者
w.schedule(task, delay, unit);
return task;
}
//EventLoopWorker.schedule
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//scheduleActual中真正交给线程池去执行
//if (delayTime <= 0) {
// f = executor.submit((Callable
subscribeOn分析没有那么细致,其实就是把把后续subscribe的过程交给异步去执行,我们知道subscribe有两步,往上封包,往下解包,subscribeOn是在往上封包的时候就转换了线程(subscribeOn下方的包是在主线程封好的)。
observeOn我们接着分析observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
AndroidSchedulers.mainThread()其实就是一个封装了主线程handler的一个类,具体细节不用管,只知道他创建了一个主线程的handler
我们这就不再很细的分析,其实只需要看observeOn返回对象的subscribeActual方法就知道他是怎么切换线程的了
public final ObservableobserveOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn (this, scheduler, delayError, bufferSize)); }
不关心hook,其实返回了一个ObservableObserveOn对象,进去看一下subscribeActual
io.reactivex.internal.operators.observable.ObservableObserveOn#subscribeActual,
protected void subscribeActual(Observer super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); //scheduler是HandlerScheduler,w也就是HandlerWorker
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
很奇怪欸,竟然没有任何线程操作,其实我们要根据subscribeOn去猜测,subscribeOn是在往上封包开始切换线程,observeOn很可能是回来的时候,也就是拆包的时候,拆包执行的是onNext,onNext又是由每个包裹实现的
//io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver#onNext
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
//io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver#schedule
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
他把this传过去,那么果不其然
static final class ObserveOnObserverextends BasicIntQueueDisposable implements Observer , Runnable
包装类继承了Runnable,我们再看一下他的run方法
public void run() {
if (outputFused) {
drainFused();
} else {
//一般走这个
drainNormal();
}
}
省去不必要的代码,只看最关键的一步
void drainNormal() {
a.onNext(v);
}
这时候我们知道只要把这个包裹交给handler那么他的onNext就会在主线程执行,接下来再看怎么给主线程的,继续分析worker.schedule(this)
我们之前知道我们调用observeOn传入的是HandlerScheduler,那么其创建的worker也就是HandlerWorker,之前也注释过了
io.reactivex.android.schedulers.HandlerScheduler.HandlerWorker#schedule
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
//下面四行是重点,封装Message,发送Message
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
总结:subscribeOn拦截上行(subscribe),observeOn拦截下行(doNext),本质都是把任务封装成runnable交给线程池或者主线程的handler



