栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Rxjava功能操作符的使用方法详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Rxjava功能操作符的使用方法详解

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;
 
import com.alibaba.fastjson.JSONObject;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
 
public class MainActivity extends AppCompatActivity {
 
  private TextView name;
 
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
 
    name = (TextView) findViewById(R.id.name);
    //用来调用下面的方法,监听。
    name.setonClickListener(new View.onClickListener() {
      @Override
      public void onClick(View v) {
 
 interval();
      }
    });
  }
 
  //例1:Observer
  public void observer() {
    //观察者
    Observer observer = new Observer() {
      @Override
      public void onSubscribe(@NonNull Disposable d) {
 
      }
      @Override
      public void onNext(@NonNull String s) {
 //接收从被观察者中返回的数据
 System.out.println("onNext :" + s);
      }
      @Override
      public void onError(@NonNull Throwable e) {
 
      }
      @Override
      public void onComplete() {
 
      }
    };
    //被观察者
    Observable observable = new Observable() {
      @Override
      protected void subscribeActual(Observer observer) {
 observer.onNext("11111");
 observer.onNext("22222");
 observer.onComplete();
      }
    };
    //产生了订阅
    observable.subscribe(observer);
  }
 
  //例2:Flowable
  private void flowable(){
    //被观察者
    Flowable.create(new FlowableOnSubscribe() {
      @Override
      public void subscribe(@NonNull FlowableEmitter e) throws Exception {
 for (int i = 0; i < 100; i++) {
   e.onNext(i+"");
 }
      }
      //背压的策略,buffer缓冲区 观察者
      //背压一共给了五种策略
      // BUFFER、
      // DROP、打印前128个,后面的删除
      // ERROR、
      // LATEST、打印前128个和最后一个,其余删除
      // MISSING
      //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
    }, BackpressureStrategy.BUFFER).subscribe(new Consumer() {
      @Override
      public void accept(String s) throws Exception {
 System.out.println("subscribe accept"+s);
 Thread.sleep(1000);
      }
    });
  }
 
  //例3:线程调度器 Scheduler
  public void flowable1(){
    Flowable.create(new FlowableOnSubscribe() {
      @Override
      public void subscribe(@NonNull FlowableEmitter e) throws Exception {
 for (int i = 0; i < 100; i++) {
   //输出在哪个线程
   System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
   e.onNext(i+"");
 }
      }
    },BackpressureStrategy.BUFFER)
 //被观察者一般放在子线程
 .subscribeOn(Schedulers.io())
 //观察者一般放在主线程
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Consumer() {
   @Override
   public void accept(String s) throws Exception {
     System.out.println("s"+ s);
     Thread.sleep(100);
     //输出在哪个线程
     System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
   }
 });
  }
 
 
  //例4:http请求网络,map转化器,fastjson解析器
  public void map1(){
    Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(@NonNull final ObservableEmitter e) throws Exception {
 OkHttpClient client = new OkHttpClient();
 Request request = new Request.Builder()
     .url("https://qhb.2dyt.com/Bwei/login")
     .build();
 client.newCall(request).enqueue(new Callback() {
   @Override
   public void onFailure(Call call, IOException e) {
 
   }
 
   @Override
   public void onResponse(Call call, Response response) throws IOException {
     String result = response.body().string();
     e.onNext(result);
   }
 });
      }
    })
 //map转换器 flatmap(无序),concatmap(有序)
 .map(new Function() {
      @Override
      public Bean apply(@NonNull String s) throws Exception {
 //用fastjson来解析数据
 return JSONObject.parseObject(s,Bean.class);
      }
    }).subscribe(new Consumer() {
      @Override
      public void accept(Bean bean) throws Exception {
 System.out.println("bean = "+ bean.toString() );
      }
    });
  }
 
  //常见rxjava操作符
  //例 定时发送消息
  public void interval(){
    Observable.interval(2,1, TimeUnit.SECONDS)
 .take(10)
 .subscribe(new Consumer() {
   @Override
   public void accept(Long aLong) throws Exception {
     System.out.println("aLong = " + aLong);
   }
 });
  }
 
 
  //例 zip字符串合并
  public void zip(){
    Observable observable1 = Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(@NonNull ObservableEmitter e) throws Exception {
 e.onNext("1");
 e.onNext("2");
 e.onNext("3");
 e.onNext("4");
 e.onComplete();
 
      }
    });
    Observable observable2 = Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(@NonNull ObservableEmitter e) throws Exception {
 e.onNext("A");
 e.onNext("B");
 e.onNext("C");
 e.onNext("D");
 e.onComplete();
      }
    });
 
    Observable.zip(observable1, observable2, new BiFunction() {
      @Override
      public String apply(@NonNull String o, @NonNull String o2) throws Exception {
 return o + o2;
      }
    }).subscribe(new Consumer() {
      @Override
      public void accept(String o) throws Exception {
 System.out.println("o"+ o);
      }
    });
  }

总结

以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/143240.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号