RxJava的学习

架构师之路 : https://www.jianshu.com/p/40a4b195a12a

一、RxJava1基本的元素

   1.Observable

   2.Observer

   3.Subscription

   4.OnSubscribe

   5.Subscriber

二、RxJava2基本元素

  1.Observable和Flowable

  2.Observer和Subsrciber

  3.Disposable和Subscription

  4.相应的OnSubscribe

  5.Emitter

三、RxJava的创建Observable操作符讲解

  1.create : 

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            Log.d(TAG,"Observable Test");
            //定义事件队列
            subscriber.onNext("aaaa");
            subscriber.onCompleted();
        }
    });

  2. just :  将为你创建一个Observable并自动为你调用onNext( )发射数据

Observable<String> observable =  Observable.just("hello");
observable.subscribe(subscriber);

  3 . from :  传入的数组Iterable拆分成具体对象后

   //定义要发送的事件集合
    List<String> mList = new ArrayList<String>();
    mList.add("aaaa");
    mList.add("bbbb");
    mList.add("cccc");
    mList.add("dddd");
    //定义Observable
    Observable<String> observable = Observable.from(mList);
    //进行订阅,开始发送事件
    observable.subscribe(subscriber);

  4. defer :  通过defer()方法创建Observable,当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable

 //定义一个被观察者
    Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {

                Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("事件订阅开始");
                    }
                });
                return mObservable;
            }
        });
        //订阅事件1,没产生一个订阅就会生成一个新的observable对象
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG,"观察者2订阅事件    "+s);
            }
        });
        //订阅事件2,没产生一个订阅就会生成一个新的observable对象
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG,"观察者1订阅事件    "+s);
            }
        });

  5 . interval :  创建一个按固定时间间隔发射整数序列的Observable,可用作定时器

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
        observable.subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d(TAG,"订阅时长为"+aLong);
            }
        });

  6 . timer : 该方法可以在一定延迟之后发送特定事件

//定义被观察者,在2000毫秒后开始发送事件
    Observable<Long> observable = Observable.timer(2000,TimeUnit.MILLISECONDS);
        Subscriber<Long> subscriber = new Subscriber<Long>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG,"收到事件啦");
            }
        };
        Log.d(TAG,"开始订阅发送事件");
        observable.subscribe(subscriber);  

四、RxJava转换Observable的操作符

  1.Map : map操作符就是通过制定一个Func1对象,将原Observable对象转换为另一个Observable对象并发射

       //定义初始事件序列数组
        Integer[] ints = {1, 2, 3};
        //调用from逐个发射数据
        //map方法把Intger类型转换为String类型
        Observable.from(ints).map(new Func1<Integer, String>() {
            @Override
            public String call(Integer i) {
                //对Integer数据进行处理,转换成String类型返回
                return i + "号玩家";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG,s+"加入游戏");
            }
        });

  2.FlatMap : flatMap也是用来做类型转换,不同于map的是,flatMap转换后得到的是一个Observable对象

//interval方法创建Observable对象,每隔1秒发送一个事件
//经过flatMap方法变化,将long类型的事件变换为一个新的Observable对象发射出去
subscription = Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<String>>() {
            @Override
            public Observable<String> call(final Long aLong) {
                return Observable.create(new Observable.OnSubscribe<String>(){
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.d("testRX---next","test"+aLong);
                        subscriber.onNext(aLong+"");
                    }
                });
            }
            //定义Subscriber对变换后的事件进行接收
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext" + s);
            }
        });

  3.GroupBy

  4.Buffer :  buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射

Observale.just(1,2,3,4,5,6).buffer(3).subscribe(new Action1<List<Integer>>() {
    

  5.Scan

  6.Window

五、RxJava过滤Obserable的操作符

  1.Debounce

  2.Distinct : 去掉重复的数据项     1,2,3,2,3,4  ---->1,2,3,4

  3.ElementAt :  打印指定数据项    A : 1,2,3,4   A.ElementAt(3)  -----> 4

  4.Filter  :  过滤掉指定的判断       A : 1,2,3,4,5,6  if(数据源>3)  ----> 4,5,6

  5.First :  得到数据项的第一个数据    A : 1,2,3,4,5   A.Frist() ------>1

  6.IgnoreElements  :  获得Observable的对象,忽略onNext的方法,直接返回onComplete或者onError的方法

  7.Last : 得到数据项的最后一个数据    A : 1,2,3,4,5   A.Last() -------> 5

  8.Sample : 

  9.Skip : 跳过指定数据项的前几项     A: 1,2,3,4,5  A.Skip(2)  ----> 3,4,5

  10.SkipLast : 跳过指定数据的后几项   A:1,2,3,4,5  A.SkipLast(2) ---->1,2,3

  11.Take : 拿到指定数据的前几项       A: 1,2,3,4,5  A.Take(2)  ---->1,2

  12.TakeLast : 拿到指定数据的后几项  A:1,2,3,4,5  A.TakeLast(2) ------>4,5

六、RxJava组合Observable的操作符

  1.Zip : 对应项进行相应的合并,没有直接合并的直接抛弃 1,2,3    4,5,6,7  -----> 1+4=5  2+5=7   3+6=9

  2.Merge : 把所有的数据源都合并为总的数据源   1,2,3   4,5,6  -----> 1,2,3,4,5,6

  3.StartWith :  在原来数据源基础之前添加想要的数据源    A : 1 2 3   B : 4,5,6 ------->   B.StartWith(A)   4,5,6,1,2,3

  4.CombineLatest :  1,3,5    2,4,6 ---->  5 + 2    5 + 4    5 + 6   最后一个数据源和下一个数据源进行相应的操作

  5.Join

  6.SwitchOnNext

七、RxJava错误处理的操作符

  1. onErrorReturn : 

  2.onErrorResumeNext : 
  3. onExceptionResumeNext  : 和上面两个区别是:它可以捕获到异常,上面是不能捕获到异常信息的

  4.retry : 当有一个错误发生时候,尝试回复正常

  5.retryWhen :  当有一个异常发生之后,直到捕获到异常时候,才去报那个异常

八、线程的调度器

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

  • subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。
  • observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, Thread.currentThread().getName());
                subscriber.onNext("aaaa");
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {

                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted");
                        Log.d(TAG, Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext " + s);
                    }
                });
原文地址:https://www.cnblogs.com/liunx1109/p/11988924.html