Map操作符号其实就是新生成一个 MapObservable来转换处理数据,然后将数据发射给 MapObserver ,待MapObserver数据处理好之后,才会最终调用自定义的 Observer对象,这个过程实际就是利用了Java里面多态的特性。
2:Map操作符源码 2.1 :MapObservable源码Observable.create(new ObservableOnSubscribe2.2 Map操作符源码() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("one"); emitter.onNext("two"); emitter.onComplete(); } }).map(new Function () { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }).subscribe(new Observer () { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe:" + d.toString()); } @Override public void onNext(Integer s) { System.out.println("onNext:" + s); } @Override public void onError(Throwable e) { System.out.println("Throwable:" + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete:"); } });
public final2.3 将 Function 对象 mapper 通过 ObservableMap 传给 ObservableMap,并完成相应的赋值Observable map(Function super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap (this, mapper)); }
public final class ObservableMap2.4 :接下来的流程和 Rxjava 基本流程基本相同,执行产生订阅关系的方法 - subscribe 时调用 ObservableMap#subscribeActual:extends AbstractObservableWithUpstream { final Function super T, ? extends U> function; public ObservableMap(ObservableSource source, Function super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer super U> t) { source.subscribe(new MapObserver (t, function)); } static final class MapObserver extends BasicFuseableObserver { final Function super T, ? extends U> mapper; MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) { // 将观察者对象 actual 赋值给 downstream super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { // 调用 mapper.apply ,其实是自定义 Function 中的 apply 方法 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } // 最终调用 Observer 的 onNext 方法 downstream.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } .... } }
@Override
public void subscribeActual(Observer super U> t) {
// 此处的 source 为调用 map 操作符的 Observable,即上一步通过 create 创建的 Observable
(ObservableCreate)
source.subscribe(new MapObserver(t, function));
}
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook retu...);
// 由于多态的存在,此处的 subscribeActual 会调用 MapObservable 的subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't t
npe.initCause(e);
throw npe;
}
}
3: 总结
其中最重要的方法 subscribeActual 调用的为 ObservableCreate 的 subscribeActual 方法,接下来和基本流程一样会调用 ObservableCreate 的 subscribe 从而开启事件的分发,与 Rxjava 基本流程不同的是 map 操作符构建了 MapObserver,完成 MapObserver 的相关操作后,才会最终调用自定义的 Observer 对象。



