栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 移动开发 > Android

RxJava取消订阅的各种方式的实现

Android 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RxJava取消订阅的各种方式的实现

手动取消订阅

Consumer类型

Observable创建返回Disposable取消

public class SecondActivity extends AppCompatActivity {

  private static final String TAG = "SecondActivity";
  private Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_second);
    disposable = Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
 try {
   Thread.sleep(5000);
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
      }
    }).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Consumer() {
   @Override
   public void accept(String s) throws Exception {
     Log.d(TAG, "accept: "+s);
   }
 });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //取消订阅
    if(disposable != null && !disposable.isDisposed()){
      disposable.dispose();
      Log.d(TAG, "onDestroy: dispose");
    }
  }
}

普通类型Observer

在Observer中获取Disposable然后取消

public class ThirdActivity extends AppCompatActivity {
  private static final String TAG = "ThirdActivity";
  Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_third);
    Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
 try {
   Thread.sleep(5000);
   emitter.onNext("testInfo");
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
      }
    }).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer() {
   @Override
   public void onSubscribe(Disposable d) {
     disposable = d;
   }

   @Override
   public void onNext(String s) {
     Log.d(TAG, "onNext: "+s);
   }

   @Override
   public void onError(Throwable e) {
     Log.d(TAG, "onError: ");
   }

   @Override
   public void onComplete() {
     Log.d(TAG, "onComplete: ");
   }
 });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //然后在需要取消订阅的地方调用即可
    if (disposable != null && !disposable.isDisposed()) {
      Log.d(TAG, "dispose: ");
      disposable.dispose();
    }
  }
}

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消

public class FourthActivity extends AppCompatActivity {
  private static final String TAG = "FourthActivity";
  private DisposableObserver observer;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_fourth);
    observer = Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
 try {
   Thread.sleep(5000);
   emitter.onNext("testInfo");
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
      }
    }).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribeWith(new DisposableObserver() {
      @Override
      public void onNext(String o) {
 Log.d(TAG, "onNext: "+o);
      }

      @Override
      public void onError(Throwable e) {
 Log.d(TAG, "onError: ");
      }

      @Override
      public void onComplete() {
 Log.d(TAG, "onComplete: ");
      }
    });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    if (observer != null && !observer.isDisposed()) {
      Log.d(TAG, "dispose: ");
      observer.dispose();
    }
  }
}

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消

public class ComDisposableActivity extends AppCompatActivity {

  private Disposable disposable1;
  private Disposable disposable2;
  private static final String TAG = "ComDisposableActivity";
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_com_disposable);
    Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
 try {
   Thread.sleep(5000);
   emitter.onNext("testInfo");
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
      }
    }).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .doonDispose(new Action() {
   @Override
   public void run() throws Exception {
     Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
   }
 })
 .subscribe(new Observer() {
   @Override
   public void onSubscribe(Disposable d) {
     disposable1 = d;
   }

   @Override
   public void onNext(String s) {
     Log.d(TAG, "onNext: "+s);
   }

   @Override
   public void onError(Throwable e) {
     Log.d(TAG, "onError: ");
   }

   @Override
   public void onComplete() {
     Log.d(TAG, "onComplete: ");
   }
 });
    Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
 try {
   Thread.sleep(5000);
   emitter.onNext("testInfo");
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
      }
    }).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer() {
   @Override
   public void onSubscribe(Disposable d) {
     disposable2 = d;
   }

   @Override
   public void onNext(String s) {
     Log.d(TAG, "onNext: "+s);
   }

   @Override
   public void onError(Throwable e) {
     Log.d(TAG, "onError: ");
   }

   @Override
   public void onComplete() {
     Log.d(TAG, "onComplete: ");
   }
 });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    //批量添加
    compositeDisposable.add(disposable1);
    compositeDisposable.add(disposable2);
    //最后一次性全部取消订阅
    compositeDisposable.dispose();
  }
}

RxLifecyle取消

OnDestory取消

Observable.interval(1, TimeUnit.SECONDS)
 .doonDispose(new Action() {
   @Override
   public void run() throws Exception {
     Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
   }
 })
 .compose(this.bindToLifecycle())
 .subscribe(new Consumer() {
   @Override
   public void accept(Long num) throws Exception {
     Log.d(TAG, "accept: " + num);
   }
 });

指定生命周期取消

Observable.interval(1,TimeUnit.SECONDS)
 .doonDispose(new Action() {
   @Override
   public void run() throws Exception {
     Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
   }
 }).compose(this.bindUntilEvent(ActivityEvent.PAUSE))
 .subscribe(new Consumer() {
   @Override
   public void accept(Long aLong) throws Exception {
     Log.d(TAG, "bindUntilEvent accept: " + aLong);
   }
 });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/154781.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号