栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rxjava原理面试(rxjava使用)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rxjava原理面试(rxjava使用)

RxJava思想及原理 核心思想

现在有一个需求,下载一张照片,用户点击下载,弹出正在下载提示框,下载结束显示图片,关闭提示框

传统思维

1.封装thread

2.编写网络请求代码

3.拿到数据创建bitmap

4.handler回调,更新ui

public void downloadImageAction(View view) {
    progressDialog = new ProgressDialog(this);
    progressDialog.setTitle("下载图片中...");
    progressDialog.show();

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                URL url = new URL(PATH);
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                httpURLConnection.setConnectTimeout(5000);
                int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    InputStream inputStream = httpURLConnection.getInputStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    Message message = handler.obtainMessage();
                    message.obj = bitmap;
                    handler.sendMessage(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }).start();
}

private final Handler handler = new Handler(new Handler.Callback() {

    @Override
    public boolean handleMessage(@NonNull Message msg) {
        Bitmap bitmap = (Bitmap) msg.obj;
        image.setImageBitmap(bitmap);

        if (progressDialog != null) progressDialog.dismiss();
        return false;
    }
});
Rx思想

观察者设计模式,起点是被观察者,终点是观察者,一条流水线的思维,响应式编程

对于下载图片这个功能,用rx思想其实也是上面4步,只是他从头到尾 是一条线,rx框架把这四个操作封装成了一行代码,他提供了切换线程,转换数据的方法,看起来是一体的,不会断,不像传统思维一样散

public void rxJavaDownloadImageAction(View view) {
    // 起点
    Observable.just(PATH)  // 内部会分发  PATH Stirng  // TODO 第二步
        // TODO 第三步
        .map(new Function() {
            @Override
            public Bitmap apply(String s) throws Exception {
                URL url = new URL(PATH);
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                httpURLConnection.setConnectTimeout(5000);
                int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    InputStream inputStream = httpURLConnection.getInputStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    return bitmap;
                }
                return null;
            }
        })
        //切换线程 之上的异步线程
		.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //之下的主线程
        // 订阅 起点 和 终点 订阅起来
        .subscribe(
        // 终点
        new Observer() {
            // 订阅开始
            @Override
            public void onSubscribe(Disposable d) {
                // 预备 开始 要分发
                // TODO 第一步
                progressDialog = new ProgressDialog(DownloadActivity.this);
                progressDialog.setTitle("download run");
                progressDialog.show();
            }

            // TODO 第四步
            // 拿到事件
            @Override
            public void onNext(Bitmap bitmap) {
                image.setImageBitmap(bitmap);
            }
            // 错误事件
            @Override
            public void onError(Throwable e) {

            }
            // TODO 第五步
            // 完成事件
            @Override
            public void onComplete() {
                if (progressDialog != null)
                    progressDialog.dismiss();
            }
        });
}

如果现在想给图片加水印的话,这个过程也不会断,只需要在map后边再加一个map

.map(new Function() {
    @Override
    public Bitmap apply(Bitmap bitmap) throws Exception {
        Paint paint = new Paint();
        paint.setTextSize(88);
        paint.setColor(Color.RED);
        return drawTextToBitmap(bitmap, "同学们大家好",paint, 88 , 88);
    }
})

这就是rx的思想,从字符串到照片,再到带水印的照片他是一条线,不想要就可以直接删掉,方便拆卸

上边代码还可以改进,封装线程切换的操作

ObservableTransformer是RxJava提供的转换器,可以将一系列操作进行封装

public final static  ObservableTransformer rxud() {
    return new ObservableTransformer() {
        @Override
        public ObservableSource apply(Observable upstream) {
            return  upstream.subscribeOn(Schedulers.io())     // 给上面代码分配异步线程
            .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程;
            .map(new Function() {
                @Override
                public UD apply(UD ud) throws Exception {
                    Log.d(TAG, "apply: 我监听到你了,居然再执行");
                    return ud;
                }
            });
        }
    };
}

//.subscribeOn(Schedulers.io())
//.observeOn(AndroidSchedulers.mainThread())
//上面两行代码可以换成
//.compose(rxud())

注意事项:Rx使用过程中要记得销毁,否则内存泄漏

@Override
protected void onDestroy() {
    super.onDestroy();
    if (disposable != null && !disposable.isDisposed()) {
        disposable.dispose();
    }
}
实战

嵌套查询,功能防抖

flatMap和doOnNext的使用

demo链接:https://pan.baidu.com/s/1cUV6Dg9XYaJNRoCHATRXFQ?pwd=fymf
提取码:ucpe

原理 改进的观察者设计模式 标准的观察者模式

就比如公众号,如果说作者发布了一条消息,关注他的都可以收到消息,对应的我们用户就是观察者,而公众号就是被观察者,也就是说我们观察者要到被观察者进行注册(容器存储),发生改变以后,通知观察者做出响应

特征:一个被观察者多个观察者

Rx观察者模式

就比如下载图片,加水印,并展示的需求,对于Rx来说终点只有一个,也就是终点观察上方的活动,每一个节点都是与一个被观察者

特征:多个被观察者一个观察者

源码分析

我们先看一段简单的代码

Observable.create(new ObservableOnSubscribe() {

    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("zjm");
    }
})
    .subscribe(new Observer() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.e("SimpleRx", "我们开始喽");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.e("SimpleRx", "要结束了喽");
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {
            Log.e("SimpleRx", "真正结束了");
        }
    });

//打印结果

//2022-02-19 11:31:05.459 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽
//2022-02-19 11:31:05.459 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm
//2022-02-19 11:31:05.460 15258-15258/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = zjm
//onComplete未执行?后边解释

其实就是两个方法create和subscribe

跟着结果去查看源码

io.reactivex.Observable#create

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static  Observable create(ObservableOnSubscribe source) {
    //异常判断
    ObjectHelper.requireNonNull(source, "source is null");
    //现在抛开主流程,我们着重分析一下RxJavaPlugins.onAssembly
    return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}

先说结论,这是Rx全局hook的地方,所有的操作符都会先走hook

Hook

io.reactivex.plugins.RxJavaPlugins#onAssembly

@NonNull
public static  Observable onAssembly(@NonNull Observable source) {
    Function f = onObservableAssembly;
    //如果我们不设置onObservableAssembly,则不会有变动
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们如果给上面的代码加上下面代码

RxJavaPlugins.setOnObservableAssembly(new Function() {
    @Override
    public Observable apply(@NonNull Observable observable) throws Exception {
        Log.e("SimpleRx hook", observable+"");
        return observable;
    }
});

//执行结果
//2022-02-19 11:40:34.875 15400-15400/com.hbsf.myrxjavademo E/SimpleRx hook: io.reactivex.internal.operators.observable.ObservableCreate@f946fb1
//2022-02-19 11:40:34.877 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽
//2022-02-19 11:40:34.877 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm
//2022-02-19 11:40:34.878 15400-15400/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = zjm

hook,不会只拦截一次,只是我们的代码让他只拦截一次,如果加map,他也会拦截,中间的节点都会被hook

creat

回到主线io.reactivex.Observable#create,查看RxJavaPlugins.onAssembly(new ObservableCreate(source))中的new

io.reactivex.internal.operators.observable.ObservableCreate#ObservableCreate()

//啥也没干,进行赋值,现在我们要记住,source是我们传进去的,这个source是一个ObservableOnSubscribe,并实现了subscribe方法
public ObservableCreate(ObservableOnSubscribe source) {
    this.source = source;
}

//我们最开始传入的source
new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Log.e("SimpleRx", "数据开始流动了 data = zjm");
        e.onNext("zjm");
    }
};
subscribe

走完creat走subscribe,creat返回了一个ObservableCreate对象继承自Observable

io.reactivex.Observable#subscribe

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //hook点,只是专门hook Subscrib方法
        observer = RxJavaPlugins.onSubscribe(this, observer);
		//判空处理
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
		//真正的执行操作,重点
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

抽象方法,我们知道之前creat,其实真正返回的是一个ObservableCreate对象

io.reactivex.Observable#subscribeActual

protected abstract void subscribeActual(Observer observer);

我们到ObservableCreate去看

io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual

//要清楚传过来的参数是什么,是我们的观察者,也就是控制开始和结束的那个对象
protected void subscribeActual(Observer observer) {
    //构造发射器
    CreateEmitter parent = new CreateEmitter(observer);
    //执行观察者开始方法1
    observer.onSubscribe(parent);

    try {
        //回调被观察者
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#CreateEmitter()

CreateEmitter(Observer observer) {
    this.observer = observer;
}

这里说明为啥要发射器,为了降低观察者和被观察者之前的耦合,我们之前让被观察者直接用list依赖观察者,耦合度较高

执行完observer.onSubscribe(parent);,此时也就打印了 ”我们开始喽“

然后走被观察者的回调,此时打印**“数据开始流动了 data = zjm”**

@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
    Log.e("SimpleRx", "数据开始流动了 data = zjm");
    e.onNext("zjm");
}

走发射器的onNext

io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext()

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    //把数据传到下层,此时这个observer就是之前我们保存的终点
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

回到终点

@Override
public void onNext(@NonNull String s) {
    Log.e("SimpleRx", "要结束了喽 data = " + s);
}

此时流程结束了,此时我们发现并没有onComplete的事,所以也就没有执行

我们再理解Rx的观察者,其实对于下层来说,下层就是上层的观察者,上层就是下层的被观察者,我们之前说多个被观察者一个观察者是对于终点来说的,其实他们是相对而言的,这也是和普通观察者模式最大的区别。

map

修改代码

//经上面分析我们知道creat返回ObservableCreate,传入的参数我们叫起始节点
Observable.create(new ObservableOnSubscribe() {

    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        Log.e("SimpleRx", "数据开始流动了 data = zjm");
        e.onNext("zjm");
    }
})
    //ObservableCreate.map 下面进行分析(先往下看),传入的参数叫转换节点
    .map(new Function() {
        @Override
        public String apply(@NonNull String s) throws Exception {
            Log.e("SimpleRx", "我改值了哦 data = ljn");
            return "ljn";
        }
    })
    //ObservableMap.subscribe,传入的参数叫终点
    .subscribe(new Observer() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.e("SimpleRx", "我们开始喽");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.e("SimpleRx", "要结束了喽 data = " + s);
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {
            Log.e("SimpleRx", "真正结束了");
        }
    });

//执行结果
//2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽
//2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm
//2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我改值了哦 data = ljn
//2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = ljn

io.reactivex.Observable#map

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final  Observable map(Function mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}

可见返回一个ObservableMap对象 继承自Observable,回到上面

也就是三步

1.Observable.create()上面已分析过,不再分析

2.ObservableCreate.map()

//io.reactivex.Observable#map
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final  Observable map(Function mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //this是指ObservableCreate,ObservableCreate中我们保存了起始节点
    return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}

//io.reactivex.internal.operators.observable.ObservableMap
public ObservableMap(ObservableSource source, Function function) {
    //保存source 
    super(source); // 方法实现是this.source = source;
    this.function = function;
}
//执行完毕后,我们想一下此时map的结构
//this.source 是我们Observable.create()返回的对象,ObservableCreate里的source 是我们最开始传入的起始节点
//this.function 是我们的转换节点

3.ObservableMap.subscribe()

ObservableMap,ObservableCreate之类的类都继承自Observable,subscribe是在父类中实现的,上边已经分析过,subscribe中调用了subscribeActual,subscribeActual是抽象方法给子类去实现的,此时会进入ObservableMap的subscribeActual

io.reactivex.internal.operators.observable.ObservableMap#subscribeActual

@Override
public void subscribeActual(Observer t) {
    //记住source是我们ObservableCreate对象
    source.subscribe(new MapObserver(t, function));
}

先看new MapObserver(t, function)

t是终点,function是转换节点,打包成MapObserver

io.reactivex.internal.operators.observable.ObservableMap.MapObserver#MapObserver

MapObserver(Observer actual, Function mapper) {
	//保存节点
    super(actual);
    this.mapper = mapper;
}

再看source.subscribe(new MapObserver(t, function));,ObservableCreate的subscribe也会走到subscribeActual

//之前已经分析过,只是参数有区别,治理参数是我们的MapObserver,MapObserver中的类结构
//this.actual 是我们的终点
//this.mapper 是我们的转换节点
@Override
protected void subscribeActual(Observer observer) {
    CreateEmitter parent = new CreateEmitter(observer);
    observer.onSubscribe(parent);//等价于MapObserver.onSubscribe,MapObserver继承自BasicFuseableObserver,也就是说走的BasicFuseableObserver.onSubscribe
    try {
        //执行起始节点的方法
        //输出 2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 数据开始流动了 data = zjm
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

io.reactivex.internal.observers.BasicFuseableObserver#onSubscribe

@Override
public final void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {

        this.s = s;
        if (s instanceof QueueDisposable) {
            this.qs = (QueueDisposable)s;
        }
        if (beforeDownstream()) {
            //actual是我们的终点,此时我们将输出
            //2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我们开始喽
            actual.onSubscribe(this);
            afterDownstream();
        }

    }
}

之前分析过source.subscribe(parent);,此时略过,我们进入我们自己写的回调

//起始节点
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
    Log.e("SimpleRx", "数据开始流动了 data = zjm");
    e.onNext("zjm");
}

继续往下走e.onNext(“zjm”);,e是我们传过来的CreateEmitter

//io.reactivex.internal.operators.observable.CreateEmitter#onNext
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        //这个observer是啥,我们之前分析过,是我们的MapObserver,t是字符串“zjm”
        observer.onNext(t);
    }
}

//io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext
@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        actual.onNext(null);
        return;
    }
    U v;
    try {
        //mapper是我们转换节点,此时就会进入我们的apply, mapper.apply(t)就是"ljn"
        //输出 2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 我改值了哦 data = ljn
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    //actual,终点
    //输出2022-02-19 14:30:02.603 16324-16324/com.hbsf.myrxjavademo E/SimpleRx: 要结束了喽 data = ljn
    actual.onNext(v);
}

到这流程结束

让我们迷糊的就是类的source和observer两个属性,source永远是上层,observer永远是下层

总结,我们不断点执行方法的过程其实就是给下层的source属性不断赋值的过程,当调用subscribe后,会把下层通过source交给上层的observer属性(对于上层来讲下面的节点就是observer),然后我们执行onSubscribe,此时会把我们的observer一层层再往下回调(对于上述例子的操作符是这么运行的,别的操作符会有区别),然后再执行我们自己的数据操作

点的过程就是往下封包的过程,我们调用subscribe,是往上封包的过程,onSubscribe的调用是往下解包的过程

线程切换的原理 subscribeOn

.subscribeOn(Schedulers.io())这句话的原理

Schedulers.io()

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

//策略,IO,
//还有别的 比如static final Scheduler NEW_THREAD;,我们之分析IO
static final Scheduler IO;

//hook最终返回IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());

//hook,不用管
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function f = onIoHandler;
    if (f == null) {
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}

//多层封装,不太重点
static final class IOTask implements Callable {
    //call方法没有直接调用是通过callRequireNonNull来调用的
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

//多层封装,也不太重点
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

//多层封装,也不太重点
public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}

//多层封装,也不太重点
public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference(NONE);
    start();
}

//多层封装,也不太重点
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

//重点,这我们就找到了线程池的初始化
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    this.expiringWorkerQueue = new ConcurrentlinkedQueue();
    this.allWorkers = new CompositeDisposable();
    this.threadFactory = threadFactory;

    ScheduledExecutorService evictor = null;
    Future task = null;
    if (unit != null) {
        //创建线程池
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}

到这我们只要记得Schedulers.io()本质就是一个线程池

subscribeOn返回ObservableSubscribeOn

public final Observable subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

此时执行subscribe,也就相当于ObservableSubscribeOn.subscribe最终执行ObservableSubscribeOn.subscribeActual

@Override
public void subscribeActual(final Observer s) {
    final SubscribeOnObserver parent = new SubscribeOnObserver(s);
	//直接回调重点的onSubscribe,在主线程执行
    s.onSubscribe(parent);
	//将终点打包,交给线程池
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

看一下怎么交给线程池的

new SubscribeTask(parent)

//只要我们把SubscribeTask交给线程池,执行run以后以后就的操作将全部异步执行
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver parent;

    SubscribeTask(SubscribeOnObserver parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
    	//异步订阅,继续往上走,这是第二次往上封包的过程
        source.subscribe(parent);
    }
}

scheduler.scheduleDirect(new SubscribeTask(parent))

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //public abstract Worker createWorker();抽象方法,不同的策略实现并不相同,IO的实现类是IoScheduler
    final Worker w = createWorker();//new EventLoopWorker(pool.get());

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    //真正的执行者
    w.schedule(task, delay, unit);

    return task;
}

//EventLoopWorker.schedule
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    //scheduleActual中真正交给线程池去执行
    //if (delayTime <= 0) {
    //    	f = executor.submit((Callable)sr);
    // } else {
    //		f = executor.schedule((Callable)sr, delayTime, unit);
    // }
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
 

subscribeOn分析没有那么细致,其实就是把把后续subscribe的过程交给异步去执行,我们知道subscribe有两步,往上封包,往下解包,subscribeOn是在往上封包的时候就转换了线程(subscribeOn下方的包是在主线程封好的)。

observeOn

我们接着分析observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
    new Callable() {
        @Override public Scheduler call() throws Exception {
            return MainHolder.DEFAULT;
        }
    });


static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

AndroidSchedulers.mainThread()其实就是一个封装了主线程handler的一个类,具体细节不用管,只知道他创建了一个主线程的handler

我们这就不再很细的分析,其实只需要看observeOn返回对象的subscribeActual方法就知道他是怎么切换线程的了

public final Observable observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

不关心hook,其实返回了一个ObservableObserveOn对象,进去看一下subscribeActual

io.reactivex.internal.operators.observable.ObservableObserveOn#subscribeActual,

protected void subscribeActual(Observer observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker(); //scheduler是HandlerScheduler,w也就是HandlerWorker

        source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
    }
}

很奇怪欸,竟然没有任何线程操作,其实我们要根据subscribeOn去猜测,subscribeOn是在往上封包开始切换线程,observeOn很可能是回来的时候,也就是拆包的时候,拆包执行的是onNext,onNext又是由每个包裹实现的

//io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver#onNext
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

//io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver#schedule
void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

他把this传过去,那么果不其然

static final class ObserveOnObserver extends BasicIntQueueDisposable implements Observer, Runnable

包装类继承了Runnable,我们再看一下他的run方法

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        //一般走这个
        drainNormal();
    }
}

省去不必要的代码,只看最关键的一步

void drainNormal() {
    a.onNext(v);
}

这时候我们知道只要把这个包裹交给handler那么他的onNext就会在主线程执行,接下来再看怎么给主线程的,继续分析worker.schedule(this)
我们之前知道我们调用observeOn传入的是HandlerScheduler,那么其创建的worker也就是HandlerWorker,之前也注释过了

io.reactivex.android.schedulers.HandlerScheduler.HandlerWorker#schedule

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);
    
    //下面四行是重点,封装Message,发送Message
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    return scheduled;
}

总结:subscribeOn拦截上行(subscribe),observeOn拦截下行(doNext),本质都是把任务封装成runnable交给线程池或者主线程的handler

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号