28、AndroidRxjava

Rxjava基础

RxJava的原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样,把你想要处理的数据一步一步地加工

成你想要的成品,然后发射给Subscriber处理。

RxJava有4个角色Observable、Observer、Subscriber和Suject。Observable和Observer 通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。

implementation 'io.reactivex:rxjava:1.2.0'
implementation 'io.reactivex:rxandroid:1.2.1'

其中RxAndroid是RxJava在Android平台的扩展。它包含了一些能够简化Android开发的工具,比如特殊的调度器(后文会提到)。

创建Observer(观察者)

它决定事件触发的时候将有怎样的行为,代码如下所示:

Subscriber subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        // 事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
    }
    @Override
    public void onError(Throwable e) {
        // 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
    }
    @Override
    public void onNext(String s) {
        // 普通的事件。将要处理的事件添加到事件队列中。
    }
    @Override
    public void onStart() {
        // 它会在事件还未发送之前被调用,可以用于做一些准备工作。(Rxjava1.0)
    }
};

其中onCompleted、onError和onNext是必须要实现的方法,其含义如下:

• onCompleted:事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
• onError:事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
• onNext:普通的事件。将要处理的事件添加到事件队列中。
• onStart:它会在事件还未发送之前被调用,可以用于做一些准备工作。

如果要实现简单的功能,也可以用到Observer来创建观察者。

Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        // 事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
    }
    @Override
    public void onError(Throwable e) {
        // 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
    }
    @Override
    public void onNext(String s) {
        // 普通的事件。将要处理的事件添加到事件队列中。
    }
};

创建Observable(被观察者)

它决定什么时候触发事件以及触发怎样的事件。RxJava 使用 create 方法来创建一个Observable,并为它定义事件触发规则,如下所示:

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("legend");
        subscriber.onNext("vincent");
        subscriber.onCompleted();
    }
});

通过调用Subscriber的方法,不断地将事件添加到任务队列中。也可用just方法来实现:

Observable<String> observable = Observable.just("legend", "vincent");

还可以用from方法来实现,如下所示:

String[] words = {"legend", "vincent"};
Observable<String> observable = Observable.from(words);

订阅只需要一行代码就可以了,如下所示:

observable.subscribe(subscriber);

Rxjava不完整回调

RxJava还提供了另一种回调方式,也就是不完整回调。在讲到不完整回调之前我们首先要了解Action。

![img](file:///C:/Users/Legend/Documents/My Knowledge/temp/93f7ebed-cb13-44e3-a644-5f4b0aba2100/128/index_files/3bd4cc63-b89f-47b1-b1fa-518a1f0c2568.png)

再打开Action1源码:

public interface Action1<T> extends Action {
    void call(T t);
}

最后打开Action9源码看看:

public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
    void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}

很明显,Action后的数字代表回调的参数类型数量,所以创建Observer和订阅代码也就可以改写为下面的代码:

// 创建观察者
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.w("TAG:", "onNext" + s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        Log.w("TAG:", "onError");
    }
};
Action0 onCompletedAction = new Action0() {
    @Override
    public void call() {
        Log.w("TAG:", "onCompleted");
    }
};
// 创建被观察者
Observable<String> observable = Observable.just("legend", "vincent");
// 订阅
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

Rxjava的Subject

Subject 既可以是一个 Observer 也可以是一个 Observerable,它是连接 Observer 和Observerable的桥梁。因此,Subject可以被理解为

Subject = Observable + Observer。RxJava提供了以下4种Subject。

https://www.cnblogs.com/yongfengnice/p/10229473.html

  • PublishSubject

    PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里会有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,

数据可能会丢失。要确保来自原始Observable的所有数据都被分发,则可以当所有观察者都已经订阅时才开始发射数据,或者改用ReplaySubject。

  • BehaviorSubject

    当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到任何数据,它会发射一个默认值,然后继续发射

其他任何来自原始Observable的数据。如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递

一个异常通知。

  • ReplaySubject

    不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给Observer。有不同类型的ReplaySubject,它们用于限定

Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。

  • AsyncSubject

    当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的Observable 因为发生了错误而终止,AsyncSubject 将不会

发射任何数据,但是会向Observer传递一个异常通知。

Rxjava操作符

RxJava操作符的类型分为创建操作符、变换操作符、过滤操作符、组合操作符、错误处理操作符、辅助操作符、条件和布尔操作符、算术和聚合操作符及连接

操作符等。这里只介绍相对常用的操作符,以及这些操作符的常规用法。

创建操作符

上方我们已经用到创建操作符create、just和from,这里就不赘述它们的用法。

还有defer、range、interval、start、repeat和timer等创建操作符。这里会讲解interval、range和repeat。

  • interval

    创建一个按固定时间间隔发射整数序列的Observable,相当于定时器,如下所示:

Observable.interval(3, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.w("TAG", "interval:" + aLong.intValue());
            }
        });

上面的代码每隔3s就会调用call方法并打印Log。

  • range

    创建发射指定范围的整数序列的Observable,可以拿来替代for循环,发射一个范围内的有序整数序列。

第一个参数是起始值,并且不小于0;第二个参数为终值,左闭右开。

Observable.range(0, 5)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "interval:" + integer.intValue());
            }
        });

输出结果为:

TAG: interval:0
TAG: interval:1
TAG: interval:2
TAG: interval:3
TAG: interval:4

  • repeat

    创建一个N次重复发射特定数据的Observable,如下所示:

Observable.range(0, 3)
        .repeat(2)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "interval:" + integer.intValue());
            }
        });

输出结果如下:

TAG: interval:0
TAG: interval:1
TAG: interval:2
TAG: interval:0
TAG: interval:1
TAG: interval:2

变换操作符

变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。

变换操作符有很多,这里会讲解map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy。

  • map

    map操作符通过指定一个Func对象,将Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable处理。

/**
 * 假设我们要访问网络,Host地址时常是变化的,它有时是测试服务器地址,有时可能是正式服务器地址,
 * 但是具体界面的URL地址则是不变的。因此,我们可以用map操作符来进行转换字符操作,
 */
final String Host = "http://blog.csdn.net/";
Observable.just("itachi85").map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return Host + s;
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.w("TAG", "map:" + s);
    }
});
  • flatMap、cast

    flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化地放进一个单独的 Observable。

cast 操作符的作用是强制将 Observable 发射的所有数据转换为指定类型。

/**
 * 假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host。
 */
final String Host = "http://blog.csdn.net/";
List<String> list = new ArrayList<>();
list.add("itachi85");
list.add("itachi86");
list.add("itachi87");
list.add("itachi88");
Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
    @Override
    public Observable<?> call(String s) {
        return Observable.just(Host + s);
    }
}).cast(String.class)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.w("TAG", "flatMap:" + s);
            }
        });

首先用ArrayList存储要访问的界面URL,然后通过flatMap转换成Observable。cast操作符将Observable中的数据转换为String类型。输出结果为:

TAG: flatMap:http://blog.csdn.net/itachi85
TAG: flatMap:http://blog.csdn.net/itachi86
TAG: flatMap:http://blog.csdn.net/itachi87
TAG: flatMap:http://blog.csdn.net/itachi88

注意:flatMap的合并允许交叉,也就是说可能会交错地发送事件,最终结果的顺序可能并不是原始Observable发送时的顺序。

  • concatMap

    concatMap操作符功能与flatMap操作符一致,但是它解决了flatMap交叉问题,提供了一种能够把发射的值连续在一起的函数,而不是合并它们。

/**
 * 假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host。
 */
final String Host = "http://blog.csdn.net/";
List<String> list = new ArrayList<>();
list.add("itachi85");
list.add("itachi86");
list.add("itachi87");
list.add("itachi88");
Observable.from(list).concatMap(new Func1<String, Observable<?>>() {
    @Override
    public Observable<?> call(String s) {
        return Observable.just(Host + s);
    }
}).cast(String.class)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.w("TAG", "concatMap:" + s);
            }
        });

输出如下所示

TAG: concatMap:http://blog.csdn.net/itachi85
TAG: concatMap:http://blog.csdn.net/itachi86
TAG: concatMap:http://blog.csdn.net/itachi87
TAG: concatMap:http://blog.csdn.net/itachi88

  • flatMaplterable

    flatMapIterable操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了,如下所示:

Observable.just(1, 2, 3).flatMapIterable(new Func1<Integer, Iterable<?>>() {
    @Override
    public Iterable<?> call(Integer integer) {
        List<Integer> list = new ArrayList<>();
        list.add(integer + 1);
        return list;
    }
}).cast(Integer.class).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "flatMapIterable:" + integer);
    }
});

在上面代码注释1处对每个数都加1,因此输出结果为:

TAG: flatMapIterable:2
TAG: flatMapIterable:3
TAG: flatMapIterable:4

  • buffer

    buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。和 buffer 操作符类似

的还有 window 操作符,只不过 window操作符发射的是Observable而不是数据列表

Observable.just(1,2,3,4,5,6)
        .buffer(3)
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {
                for (Integer i : integers) {
                    Log.w("TAG", "buffer:" + i);
                }
                Log.w("TAG", "-------------------");
            }
        });

输出结果如下:

TAG: buffer:1
TAG: buffer:2
TAG: buffer:3
TAG: -------------------
TAG: buffer:4
TAG: buffer:5
TAG: buffer:6
TAG: -------------------

  • groupBy

    goupBy操作符用于分组元素,将源Observable变换成一个发射Observables的新Observable (分组后的)。它们中的每一个新Observable都发射

一组指定的数据,如下所示:

Swordman s1 = new Swordman("韦一笑", "A");
Swordman s2 = new Swordman("张三丰", "SS");
Swordman s3 = new Swordman("周芷若", "S");
Swordman s4 = new Swordman("宋远桥", "S");
Swordman s5 = new Swordman("殷梨亭", "A");
Swordman s6 = new Swordman("张无忌", "SS");
Swordman s7 = new Swordman("鹤笔翁", "S");
Swordman s8 = new Swordman("宋青书", "A");
Observable<GroupedObservable<String, Swordman>> groupedObservable
        = Observable.just(s1, s2, s3, s4, s5, s6, s7, s8)
        .groupBy(new Func1<Swordman, String>() {
            @Override
            public String call(Swordman swordman) {
                return swordman.getLevel();
            }
        });
Observable.concat(groupedObservable).subscribe(new Action1<Swordman>() {
    @Override
    public void call(Swordman swordman) {
        Log.w("TAG", "groupBy:" + swordman.getName() + "---" + swordman.getLevel());
    }
});

这里创建了《倚天屠龙记》里的8个武侠,对其按照实力等级进行划分,从高到低依次是SS、S、A。使用groupBy可以帮助我们对某一个key值进行

分组,将相同的key值数据排在一起。在这里我们的key值就是武侠的Level(实力等级)。

TAG: groupBy:韦一笑---A
TAG: groupBy:殷梨亭---A
TAG: groupBy:宋青书---A
TAG: groupBy:张三丰---S
TAG: groupBy:张无忌---S
TAG: groupBy:周芷若---S
TAG: groupBy:宋远桥---S
TAG: groupBy:鹤笔翁---S

过滤操作符

过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。

过滤操作符有很多种,这里会介绍filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst和throttleWithTimeOut。

  • filter

    filter操作符是对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者,如下所示。

Observable.just(1, 2, 3, 4).filter(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        // 设定大于2的数字会被返回并被提交给订阅者。
        return integer > 2;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "filter:" + integer);
    }
});

输出结果为:

TAG: filter:3
TAG: filter:4

  • elementAt

    elementAt操作符用来返回指定位置的数据。和它类似的有elementAtOrDefault(int,T),其可以允许默认值。具体代码如下所示:

Observable.just(1, 2, 3, 4).elementAt(2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "elementAt:" + integer);
    }
});

输出结果为:

TAG: elementAt:3

  • distinct

    distinct 操作符用来去重,其只允许还没有发射过的数据项通过。和它类似的还有distinctUntilChanged操作符,它用来去掉连续重复的数据。

Observable.just(1, 2, 3, 4, 1).distinct().subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "distinct:" + integer);
    }
});

输出结果为:

TAG: distinct:1
TAG: distinct:2
TAG: distinct:3
TAG: distinct:4

  • skip、take

    skip操作符将源Observable发射的数据过滤掉前n项;而take操作符则只取前n项;另外还有skipLast和takeLast操作符,则是从后面进行过滤操作。

先来看skip操作符,如下所示:

Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "skip:" + integer);
    }
});

输出结果为:

TAG: skip:3
TAG: skip:4
TAG: skip:5
TAG: skip:6

接下来查看take操作符,如下所示:

Observable.just(1, 2, 3, 4, 5, 6).take(2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "take:" + integer);
    }
});

输出结果为:

TAG: take:1
TAG: take:2

  • ignoreElements*(疑问)*

    ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。

Observable.just(1, 2, 3, 4).ignoreElements().subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        Log.w("TAG", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.w("TAG", "onError");
    }
    @Override
    public void onNext(Integer integer) {
        Log.w("TAG", "onNext");
    }
});

输出结果为:

TAG: onCompleted

  • throttleFirst(疑问)

    throttleFirst操作符则会定期发射这个时间段里源Observable发射的第一个数据,throttleFirst操作符默认在computation 调度器

上执行。和 throttleFirst 操作符类似的有sample操作符,它会定时地发射源Observable最近发射的数据,其他的都会被过滤掉。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 10; i++) {
            subscriber.onNext(i);
            try {
                Thread.sleep(100);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        subscriber.onCompleted();
    }
}).throttleFirst(200, TimeUnit.MICROSECONDS)
        .subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "throttleFirst:" + integer);
    }
});

每隔100ms发射一个数据。throttleFirst操作符设定的时间为200ms,因此,它会发射每隔200ms内的第一个数据,输出结果为:

TAG: throttleFirst:0
TAG: throttleFirst:1
TAG: throttleFirst:2
TAG: throttleFirst:3
TAG: throttleFirst:4
TAG: throttleFirst:5
TAG: throttleFirst:6
TAG: throttleFirst:7
TAG: throttleFirst:8
TAG: throttleFirst:9

输出结果和书本上的输出结果不一致。

  • throttleWithTimeOut

    通过时间来限流。源Observable每次发射出来一个数据后就会进行计时。如果在设定好的时间结束前源Observable有新的数据发射

出来,这个数据就会被丢弃,同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极

端:只会发射最后一个数据。其默认在 computation调度器上执行。和 throttleWithTimeOut 操作符类似的有deounce 操作符,它不仅可

以使用时间来进行过滤,还可以根据一个函数来进行限流。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 10; i++) {
            subscriber.onNext(i);
            int sleep = 100;
            if (i % 3 == 0) {
                sleep = 300;
            }
            try {
                Thread.sleep(sleep);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        subscriber.onCompleted();
    }
}).throttleWithTimeout(200, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "throttleWithTimeout:" + integer);
            }
        });

每隔100ms发射一个数据。当发射的数据是3的倍数的时候,下一个数据就延迟到300ms再发射。这里设定的过滤时间是200ms,也就是

说发射间隔小于200ms的数据会被过滤掉。

TAG: throttleWithTimeout:0
TAG: throttleWithTimeout:3
TAG: throttleWithTimeout:6
TAG: throttleWithTimeout:9

组合操作符

组合操作符可以同时处理多个Observable来创建我们所需要的Observable。组合操作符有很多,这里介绍startWith、merge、concat、

zip和combineLastest。

  • startWith

    startWith操作符会在源Observable发射的数据前面插上一些数据,如下所示:

Observable.just(3, 4, 5).startWith(1, 2)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "startWith:" + integer);
            }
        });

输出结果为:

TAG: startWith:1
TAG: startWith:2
TAG: startWith:3
TAG: startWith:4
TAG: startWith:5

  • merge

    merge操作符将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错。

Observable<Integer> observable1 = Observable.just(1, 2, 3).subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(3, 4, 5);
Observable.merge(observable1, observable2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "merge:" + integer);
    }
});

输出结果为:

TAG: merge:3
TAG: merge:4
TAG: merge:5
TAG: merge:1
TAG: merge:2
TAG: merge:3

  • concat

    将多个 Obserbavle 发射的数据进行合并发射。concat 严格按照顺序发射数据,前一个Observable没发射完成是不会发射后一个Observable的数据。

Observable<Integer> observable1 = Observable.just(1, 2, 3).subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(3, 4, 5);
Observable.concat(observable1, observable2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "concat:" + integer);
    }
});

输出结果为:

TAG: concat:1
TAG: concat:2
TAG: concat:3
TAG: concat:3
TAG: concat:4
TAG: concat:5

  • zip

    zip操作符合并两个或者多个Observable发射出的数据项,根据指定的函数变换它们,并发射一个新值。

Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<String> observable2 = Observable.just("a", "b", "c");
Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
    @Override
    public String call(Integer integer, String s) {
        return integer + s;
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.w("TAG", "zip:" + s);
    }
});

输出结果为:

TAG: zip:1a
TAG: zip:2b
TAG: zip:3c

  • combineLastest

    当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<String> observable2 = Observable.just("a", "b", "c");
Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
    @Override
    public String call(Integer integer, String s) {
        return integer + s;
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.w("TAG", "combineLatest:" + s);
    }
});

如果其中的一个 Observable 还有数据没有发射,那么 combineLastest 操作符会将两个Observable最新发射的数据组合在一起。

TAG: combineLatest:3a
TAG: combineLatest:3b
TAG: combineLatest:3c

辅助操作符

辅助操作符可以帮助我们更加方便地处理 Observable。辅助操作符包括有很多。在这里介绍delay、Do、subscribeOn、observeOn和timeout。

  • delay

    delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。

Observable.create(new Observable.OnSubscribe<Long>() {
    @Override
    public void call(Subscriber<? super Long> subscriber) {
        Long currentTime = System.currentTimeMillis() / 1000;
        subscriber.onNext(currentTime);
    }
}).delay(2, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.w("TAG", "delay:" + (System.currentTimeMillis() / 1000 - aLong));
            }
        });

输出结果为:

TAG: delay:2

  • do

    Do系列操作符就是为原始Observable的生命周期事件注册一个回调,当Observable的某个事件发生时就会调用这些回调。

RxJava中有很多Do系列操作符,如下所示。

doOnEach:为 Observable注册这样一个回调,当Observable每发射一项数据时就会调用它一次,包括onNext、onError和 onCompleted。
doOnNext:只有执行onNext的时候会被调用。
doOnSubscribe:当观察者订阅Observable时就会被调用。
doOnUnsubscribe:当观察者取消订阅Observable时就会被调用;Observable通过onError或者onCompleted结束时,会取消订阅所有的Subscriber。
doOnCompleted:当Observable 正常终止调用onCompleted时会被调用。
doOnError:当Observable 异常终止调用onError时会被调用。
doOnTerminate:当Observable 终止(无论是正常终止还是异常终止)之前会被调用。
finallyDo:当Observable 终止(无论是正常终止还是异常终止)之后会被调用。

这里拿doOnNext来举例,代码如下所示:

Observable.just(1, 2)
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "doOnNext:" + integer);
            }
        }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.w("TAG", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.w("TAG", "onError:" + e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.w("TAG", "onNext:" + integer);
    }
});

输出结果为:

TAG: doOnNext:1
TAG: onNext:1
TAG: doOnNext:2
TAG: onNext:2
TAG: onCompleted

  • subscribeOn、observeOn

    subscribeOn操作符用于指定Observable自身在哪个线程上运行。如果Observable需要执行耗时操作,一般可以让其在新开的一个子线程上运行。

observerOn用来指定Observer所运行的线程,也就是发射出的数据在哪个线程上使用(一般指定UI线程)。

Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        Log.w("TAG", "Observable:" + Thread.currentThread().getName());
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", "Observer:" + Thread.currentThread().getName());
            }
        });

subscribeOn(Schedulers.newThread())表示 Observable 运行在新开的线程,observeOn(AndroidSchedulers.mainThread())表示运行在主线程。

TAG: Observable:RxNewThreadScheduler-1
TAG: Observer:main
  • timeout

    如果原始 Observable 过了指定的一段时长没有发射任何数据,timeout 操作符会以一个onError通知终止这个Observable,或者继续执行一个备用的

Observable。timeout有很多变体,这里介绍其中的一种:timeout(long,TimeUnit,Observable),它在超时时会切换到使用一个你指定的备用的

Observable,而不是发送错误通知。它默认在computation调度器上执行。

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 4; i++) {
            try {
                Thread.sleep(i * 100);
            }catch (Exception e) {
                e.printStackTrace();
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(10, 11));
observable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "timeout:" + integer);
    }
});

如果Observable在200ms这段时长没有发射数据,就会切换到Observable.just(10,11),输出结果为:

TAG: timeout:0
TAG: timeout:1
TAG: timeout:2
TAG: timeout:10
TAG: timeout:11

错误处理操作符

RxJava在错误出现的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错误。但是如果每个 Subscriber 都处理一遍的话,

工作量就有点大了,这时候可以使用错误处理操作符。错误处理操作符有catch和 retry。

  • catch

    catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。RxJava将catch

实现为以下 3个不同的操作符。

• onErrorReturn:Observable遇到错误时返回原有Observable行为的备用Observable,原有Observable的onError调用,不会将错误传递给观察者。作为替代,它会发射一个者的onCompleted方法。
• onErrorResumeNext:Observable遇到错误时返回原有Observable行为的备Observable会忽略原有Observable的onError调用,不会将错误传递给观察者。
• onExceptionResumeNext:它和onErrorResumeNext类似。不同的是,如果onError收一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

这里拿onErrorReturn操作符来举例,如下所示:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            if (i > 2) {
                subscriber.onError(new Throwable("Throwable"));
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).onErrorReturn(new Func1<Throwable, Integer>() {
    @Override
    public Integer call(Throwable throwable) {
        return 6;
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        Log.w("TAG", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.w("TAG", "onError:" + e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.w("TAG", "onNext:" + integer);
    }
});

输出结果为:

TAG: onNext:0
TAG: onNext:1
TAG: onNext:2
TAG: onNext:6
TAG: onCompleted

  • retry

    retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成其数据序列。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            if (i == 1) {
                subscriber.onError(new Throwable("Throwable"));
            }else {
                subscriber.onNext(i);
            }
        }
        subscriber.onCompleted();
    }
}).retry(2).subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        Log.w("TAG", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.w("TAG", "onError:" + e.getMessage());
    }
    @Override
    public void onNext(Integer integer) {
        Log.w("TAG", "onNext:" + integer);
    }
});

重新订阅次数为2,输出代码如下:

TAG: onNext:0
TAG: onNext:0
TAG: onNext:0
TAG: onError:Throwable

条件操作符

条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对它们做布尔运算。

  • 布尔操作符

    布尔操作符有all、contains、isEmpty、exists和sequenceEqual,这里介绍前3个操作符。

(1)all操作符

all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足

我们定义好的判断条件。如果全部都满足则返回true,否则就返回false。

Observable.just(1, 2, 3, 4)
        .all(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                Log.w("TAG", "call:" + integer);
                return integer < 3;
            }
        }).subscribe(new Subscriber<Boolean>() {
    @Override
    public void onCompleted() {
        Log.w("TAG", "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.w("TAG", "onError:" + e.getMessage());
    }
    @Override
    public void onNext(Boolean aBoolean) {
        Log.w("TAG", "onNext:" + aBoolean);
    }
});

输出结果为:

TAG: call:1
TAG: call:2
TAG: call:3
TAG: onNext:false
TAG: onCompleted

(2)contains 和isEmpty操作符

contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。包含该数据会返回true;否则返回false。

isEmpty操作符用来判断源Observable 是否发射过数据。如果发射过数据,就会返回 false;否则返回true。

Observable.just(1, 2, 3).contains(1).subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        Log.w("TAG", "contains:" + aBoolean);
    }
});
Observable.just(1, 2, 3).isEmpty().subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        Log.w("TAG", "isEmpty:" + aBoolean);
    }
});

输出结果为:

TAG: contains:true
TAG: isEmpty:false

  • 条件操作符

    条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil和takeWhile等,这里介绍前两种。

(1)amb操作符

amb 操作符对于给定两个或多个 Observable,它只发射首先发射数据或通知的那个Observable的所有数据。

Observable<Integer> observable1 = Observable.just(1, 2, 3).delay(2, TimeUnit.SECONDS);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable.amb(observable1, observable2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "amb:" + integer);
    }
});

第一个Observable延时2s发射,所以很显然最终只会发射第二个Observable,输出结果为:

TAG: amb:4
TAG: amb:5
TAG: amb:6

(2)defaultIfEmpty操作符

发射来自原始Observable的数据。如果原始Observable没有发射数据,就发射一个默认数据,如下所示:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onCompleted();
    }
}).defaultIfEmpty(3).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.w("TAG", "defaultIfEmpty:" + integer);
    }
});

可以看出并没有发射数据而是直接调用onCompleted方法来完成事件,输出结果为:

defaultIfEmpty:3

转换操作符

转换操作符用来将 Observable 转换为另一个对象或数据结构,转换操作符有 toList、toSortedList、toMap、toMultiMap、getIterator和nest等,这里介绍前3种。

  • toList

    toList操作符将发射多项数据且为每一项数据调用onNext方法的Observable发射的多项数据组合成一个List,然后调用一次onNext方法传递整个列表。

Observable.just(1, 2, 3).toList().subscribe(new Action1<List<Integer>>() {
    @Override
    public void call(List<Integer> integers) {
        for (int integer : integers) {
            Log.w("TAG", "toList:" + integer);
        }
    }
});

输出结果为:

TAG: toList:1
TAG: toList:2
TAG: toList:3

  • toSortedList

    toSortedList操作符类似于toList操作符;不同的是,它会对产生的列表排序,默认是自然升序。

发射的数据项必须实现Comparable接口,没有则抛出异常,也可以使用toSortedList(Func2)变体,其传递的函数参数Func2会作用于比较两个数据项。

Observable.just(3, 1, 2).toSortedList().subscribe(new Action1<List<Integer>>() {
    @Override
    public void call(List<Integer> integers) {
        for (int integer : integers) {
            Log.w("TAG", "toSortedList:" + integer);
        }
    }
});

输出结果为:

TAG: toSortedList:1
TAG: toSortedList:2
TAG: toSortedList:3

  • toMap

    toMap操作符收集原始Observable发射的所有数据项到一个Map(默认是HashMap),然后发射这个Map。

Swordsman s1 = new Swordsman("韦一笑", "A");
Swordsman s2 = new Swordsman("张三丰", "SS");
Swordsman s3 = new Swordsman("周芷若", "S");
Observable.just(s1, s2, s3).toMap(new Func1<Swordsman, String>() {
    @Override
    public String call(Swordsman swordsman) {
        return swordsman.getLevel();
    }
}).subscribe(new Action1<Map<String, Swordsman>>() {
    @Override
    public void call(Map<String, Swordsman> stringSwordsmanMap) {
        Log.w("TAG", "toMap:" + stringSwordsmanMap.get("SS").getName());
    }
});

输出结果为:

TAG: toMap:张三丰

Rxjava线程控制

内置Scheduler

如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的。如果我们想切换线程,就需要使用Scheduler。

RxJava 已经内置了如下5个Scheduler。

Schedulers.immediate():直接在当前线程运行,它是timeout、timeInterval和timestamp操作符的默认调度器。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():IO操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。(内部有无数量上限的线程池,效率比newThread高)
Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler使用固定线程池,大小为 CPU 核数。
Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以用.trampoline()将它入队。

另外,RxAndroid也提供了一个常用的Scheduler:

AndroidSchedulers.mainThread()—RxAndroid库中提供的Scheduler,它指定的操作在主线程中运行。

注意:在RxJava中用subscribeOn和observeOn操作符来控制线程。

// 子线程中发射源数据,在主线程中打印
Observable.just(1, 2, 3)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.w("TAG", Thread.currentThread().getName() + "--" + integer);
            }
        });

输出结果如下:

TAG: main--1
TAG: main--2
TAG: main--3

Rxjava使用场景

关于RxJava,实际有很多可以使用它的场景,这里讲解RxJava结合OkHttp、Retrofit访问网络及用RxJava实现RxBus。

1、Rxjava + OkHttp

RxJava结合Retrofit访问网络是比较好的搭配,当然RxJava结合OkHttp访问网络也是可以的。

首先创建Observable,代码如下所示:

// Observable
public Observable<String> getObservable(final String ip) {
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(final Subscriber<? super String> subscriber) {
            OkHttpClient okHttpClient = new OkHttpClient();
            RequestBody requestBody = new FormBody.Builder()
                    .add("ip", ip)
                    .build();
            Request request = new Request.Builder()
                    .url("http://ip.taobao.com/service/getIpInfo.php")
                    .post(requestBody)
                    .build();
            Call call = okHttpClient.newCall(request);
            call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    subscriber.onError(new Exception("error"));
                }
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    String str = response.body().string();
                    // 调用subscriber.onNext来将请求返回的数据添加到事件队列中。
                    subscriber.onNext(str);
                    subscriber.onCompleted();
                }
            });
        }
    });
    return observable;
}

接下来实现观察者,如下所示:

// Observer
private void postAsynHttp(String ip) {
    getObservable(ip).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.w("TAG", "onCompleted");
                }
                @Override
                public void onError(Throwable e) {
                    Log.w("TAG", "onError:" + e.getMessage());
                }
                @Override
                public void onNext(String s) {
                    Log.w("TAG", "onNext:" + s);
                    Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
                }
            });
}

Rxjava +Retrofit

  • 使用前的准备工作

    首先在build.gradle中配置依赖:

implementation 'io.reactivex:rxjava:1.2.0'
implementation 'io.reactivex:rxandroid:1.2.1'
implementation 'com.squareup.retrofit2:retrofit:2.1.0'
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
implementation 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
implementation 'com.squareup.okhttp3:logging-interceptor:3.3.1'

  • 修改请求网络接口

    Retrofit的请求接口返回的是Call。若结合RxJava使用,则需要把Call改为Observable,代码如下所示:

public interface ApiService {
    @FormUrlEncoded
    @POST("getIpInfo.php")
    Observable<HttpResult<String>> getIpMsg(@Field("ip") String ip);
}
  • 修改请求网络方法
public void postIpInformation(String ip) {
    String url = "http://ip.taobao.com/service/";
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(url)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();
    ApiService apiService = retrofit.create(ApiService.class);
    subscribe = apiService.getIpMsg(ip).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<HttpResult<String>>() {
                @Override
                public void onCompleted() {
                    Log.w("TAG", "onCompleted");
                }
                @Override
                public void onError(Throwable e) {
                    Log.w("TAG", "onError:" + e.getMessage());
                }
                @Override
                public void onNext(HttpResult<String> stringHttpResult) {
                    Log.w("TAG", "onNext:" + stringHttpResult.getCode());
                    Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
                }
            });
}

请求返回数据格式封装如下所示:

public class HttpResult<T> {
    private int code;
    private T data;
    ......
}
  • 取消请求

    如果只使用Retrofit,可以用Call的cancel方法来取消请求。在RxJava中可以使用如下代码:

@Override
protected void onStop() {
    super.onStop();
    if (subscribe != null && subscribe.isUnsubscribed()) {
        subscribe.unsubscribe();
    }
}

当然一个界面也可能会用多个请求。如果想取消多个请求,可以使用CompositeSubscription,如下所示:

private CompositeSubscription compositeSubscription = new CompositeSubscription();
...
// 发送请求的地方,将返回的subscript添加到compositeSubscription中
compositeSubscription.add(subscribe);
...
// 取消所有请求操作    
@Override
protected void onStop() {
    super.onStop();
    compositeSubscription.unsubscribe();
}   

Rxjava + RxBus

首先创建RxBus。这里的RxBus只是支持基本的功能,如果想要添加一些功能,则可以自定义添加,代码如下所示:

public class RxBus {
    private static volatile RxBus instance;
    private Subject<Object, Object> bus;
    private RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }
    public static RxBus getDefault() {
        if (instance == null) {
            synchronized (RxBus.class) {
                instance = new RxBus();
            }
        }
        return instance;
    }
    // 发送事件
    public void post(Object object) {
        bus.onNext(object);
    }
    // 根据类型接收相应类型事件
    public <T> Observable toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }
}

接下来创建RxBus发送对象的实体类:

public class EventBean {
    private String name;
    private Integer age;
    public EventBean(String name, Integer age) {
        this.name = name;
        this.age = age;
    }
    ......
}

发送事件的方式如下:

mBtnPost = findViewById(R.id.btn_post);
// 发送事件
mBtnPost.setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View v) {
        RxBus.getDefault().post(new EventBean("legend", 20));
    }
});

接收事件的方式如下:

// 接收事件
RxBus.getDefault().toObservable(EventBean.class)
        .subscribe(new Action1<EventBean>() {
            @Override
            public void call(EventBean eventBean) {
                Toast.makeText(MainActivity.this, "接收成功", Toast.LENGTH_SHORT).show();
            }
        });
原文地址:https://www.cnblogs.com/pengjingya/p/14952251.html