Rxjava基础
RxJava的原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样,把你想要处理的数据一步一步地加工
成你想要的成品,然后发射给Subscriber处理。
RxJava有4个角色Observable、Observer、Subscriber和Suject。Observable和Observer 通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。
implementation 'io.reactivex:rxjava:1.2.0'
implementation 'io.reactivex:rxandroid:1.2.1'
其中RxAndroid是RxJava在Android平台的扩展。它包含了一些能够简化Android开发的工具,比如特殊的调度器(后文会提到)。
创建Observer(观察者)
它决定事件触发的时候将有怎样的行为,代码如下所示:
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
// 事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
}
@Override
public void onError(Throwable e) {
// 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
}
@Override
public void onNext(String s) {
// 普通的事件。将要处理的事件添加到事件队列中。
}
@Override
public void onStart() {
// 它会在事件还未发送之前被调用,可以用于做一些准备工作。(Rxjava1.0)
}
};
其中onCompleted、onError和onNext是必须要实现的方法,其含义如下:
• onCompleted:事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
• onError:事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
• onNext:普通的事件。将要处理的事件添加到事件队列中。
• onStart:它会在事件还未发送之前被调用,可以用于做一些准备工作。
如果要实现简单的功能,也可以用到Observer来创建观察者。
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
// 事件队列完结。当不会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
}
@Override
public void onError(Throwable e) {
// 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
}
@Override
public void onNext(String s) {
// 普通的事件。将要处理的事件添加到事件队列中。
}
};
创建Observable(被观察者)
它决定什么时候触发事件以及触发怎样的事件。RxJava 使用 create 方法来创建一个Observable,并为它定义事件触发规则,如下所示:
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("legend");
subscriber.onNext("vincent");
subscriber.onCompleted();
}
});
通过调用Subscriber的方法,不断地将事件添加到任务队列中。也可用just方法来实现:
Observable<String> observable = Observable.just("legend", "vincent");
还可以用from方法来实现,如下所示:
String[] words = {"legend", "vincent"};
Observable<String> observable = Observable.from(words);
订阅只需要一行代码就可以了,如下所示:
observable.subscribe(subscriber);
Rxjava不完整回调
RxJava还提供了另一种回调方式,也就是不完整回调。在讲到不完整回调之前我们首先要了解Action。
![img](file:///C:/Users/Legend/Documents/My Knowledge/temp/93f7ebed-cb13-44e3-a644-5f4b0aba2100/128/index_files/3bd4cc63-b89f-47b1-b1fa-518a1f0c2568.png)
再打开Action1源码:
public interface Action1<T> extends Action {
void call(T t);
}
最后打开Action9源码看看:
public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}
很明显,Action后的数字代表回调的参数类型数量,所以创建Observer和订阅代码也就可以改写为下面的代码:
// 创建观察者
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG:", "onNext" + s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.w("TAG:", "onError");
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.w("TAG:", "onCompleted");
}
};
// 创建被观察者
Observable<String> observable = Observable.just("legend", "vincent");
// 订阅
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Rxjava的Subject
Subject 既可以是一个 Observer 也可以是一个 Observerable,它是连接 Observer 和Observerable的桥梁。因此,Subject可以被理解为
Subject = Observable + Observer。RxJava提供了以下4种Subject。
-
PublishSubject
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里会有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,
数据可能会丢失。要确保来自原始Observable的所有数据都被分发,则可以当所有观察者都已经订阅时才开始发射数据,或者改用ReplaySubject。
-
BehaviorSubject
当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到任何数据,它会发射一个默认值,然后继续发射
其他任何来自原始Observable的数据。如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递
一个异常通知。
-
ReplaySubject
不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给Observer。有不同类型的ReplaySubject,它们用于限定
Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。
-
AsyncSubject
当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的Observable 因为发生了错误而终止,AsyncSubject 将不会
发射任何数据,但是会向Observer传递一个异常通知。
Rxjava操作符
RxJava操作符的类型分为创建操作符、变换操作符、过滤操作符、组合操作符、错误处理操作符、辅助操作符、条件和布尔操作符、算术和聚合操作符及连接
操作符等。这里只介绍相对常用的操作符,以及这些操作符的常规用法。
创建操作符
上方我们已经用到创建操作符create、just和from,这里就不赘述它们的用法。
还有defer、range、interval、start、repeat和timer等创建操作符。这里会讲解interval、range和repeat。
-
interval
创建一个按固定时间间隔发射整数序列的Observable,相当于定时器,如下所示:
Observable.interval(3, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.w("TAG", "interval:" + aLong.intValue());
}
});
上面的代码每隔3s就会调用call方法并打印Log。
-
range
创建发射指定范围的整数序列的Observable,可以拿来替代for循环,发射一个范围内的有序整数序列。
第一个参数是起始值,并且不小于0;第二个参数为终值,左闭右开。
Observable.range(0, 5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "interval:" + integer.intValue());
}
});
输出结果为:
TAG: interval:0
TAG: interval:1
TAG: interval:2
TAG: interval:3
TAG: interval:4
-
repeat
创建一个N次重复发射特定数据的Observable,如下所示:
Observable.range(0, 3)
.repeat(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "interval:" + integer.intValue());
}
});
输出结果如下:
TAG: interval:0
TAG: interval:1
TAG: interval:2
TAG: interval:0
TAG: interval:1
TAG: interval:2
变换操作符
变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去。
变换操作符有很多,这里会讲解map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy。
-
map
map操作符通过指定一个Func对象,将Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable处理。
/**
* 假设我们要访问网络,Host地址时常是变化的,它有时是测试服务器地址,有时可能是正式服务器地址,
* 但是具体界面的URL地址则是不变的。因此,我们可以用map操作符来进行转换字符操作,
*/
final String Host = "http://blog.csdn.net/";
Observable.just("itachi85").map(new Func1<String, String>() {
@Override
public String call(String s) {
return Host + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG", "map:" + s);
}
});
-
flatMap、cast
flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化地放进一个单独的 Observable。
cast 操作符的作用是强制将 Observable 发射的所有数据转换为指定类型。
/**
* 假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host。
*/
final String Host = "http://blog.csdn.net/";
List<String> list = new ArrayList<>();
list.add("itachi85");
list.add("itachi86");
list.add("itachi87");
list.add("itachi88");
Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
return Observable.just(Host + s);
}
}).cast(String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG", "flatMap:" + s);
}
});
首先用ArrayList存储要访问的界面URL,然后通过flatMap转换成Observable。cast操作符将Observable中的数据转换为String类型。输出结果为:
TAG: flatMap:http://blog.csdn.net/itachi85
TAG: flatMap:http://blog.csdn.net/itachi86
TAG: flatMap:http://blog.csdn.net/itachi87
TAG: flatMap:http://blog.csdn.net/itachi88
注意:flatMap的合并允许交叉,也就是说可能会交错地发送事件,最终结果的顺序可能并不是原始Observable发送时的顺序。
-
concatMap
concatMap操作符功能与flatMap操作符一致,但是它解决了flatMap交叉问题,提供了一种能够把发射的值连续在一起的函数,而不是合并它们。
/**
* 假设我们仍旧访问网络,但是要访问同一个Host的多个界面,我们可以使用for循环在每个界面的URL前添加Host。
*/
final String Host = "http://blog.csdn.net/";
List<String> list = new ArrayList<>();
list.add("itachi85");
list.add("itachi86");
list.add("itachi87");
list.add("itachi88");
Observable.from(list).concatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
return Observable.just(Host + s);
}
}).cast(String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG", "concatMap:" + s);
}
});
输出如下所示
TAG: concatMap:http://blog.csdn.net/itachi85
TAG: concatMap:http://blog.csdn.net/itachi86
TAG: concatMap:http://blog.csdn.net/itachi87
TAG: concatMap:http://blog.csdn.net/itachi88
-
flatMaplterable
flatMapIterable操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了,如下所示:
Observable.just(1, 2, 3).flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
List<Integer> list = new ArrayList<>();
list.add(integer + 1);
return list;
}
}).cast(Integer.class).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "flatMapIterable:" + integer);
}
});
在上面代码注释1处对每个数都加1,因此输出结果为:
TAG: flatMapIterable:2
TAG: flatMapIterable:3
TAG: flatMapIterable:4
-
buffer
buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。和 buffer 操作符类似
的还有 window 操作符,只不过 window操作符发射的是Observable而不是数据列表
Observable.just(1,2,3,4,5,6)
.buffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (Integer i : integers) {
Log.w("TAG", "buffer:" + i);
}
Log.w("TAG", "-------------------");
}
});
输出结果如下:
TAG: buffer:1
TAG: buffer:2
TAG: buffer:3
TAG: -------------------
TAG: buffer:4
TAG: buffer:5
TAG: buffer:6
TAG: -------------------
-
groupBy
goupBy操作符用于分组元素,将源Observable变换成一个发射Observables的新Observable (分组后的)。它们中的每一个新Observable都发射
一组指定的数据,如下所示:
Swordman s1 = new Swordman("韦一笑", "A");
Swordman s2 = new Swordman("张三丰", "SS");
Swordman s3 = new Swordman("周芷若", "S");
Swordman s4 = new Swordman("宋远桥", "S");
Swordman s5 = new Swordman("殷梨亭", "A");
Swordman s6 = new Swordman("张无忌", "SS");
Swordman s7 = new Swordman("鹤笔翁", "S");
Swordman s8 = new Swordman("宋青书", "A");
Observable<GroupedObservable<String, Swordman>> groupedObservable
= Observable.just(s1, s2, s3, s4, s5, s6, s7, s8)
.groupBy(new Func1<Swordman, String>() {
@Override
public String call(Swordman swordman) {
return swordman.getLevel();
}
});
Observable.concat(groupedObservable).subscribe(new Action1<Swordman>() {
@Override
public void call(Swordman swordman) {
Log.w("TAG", "groupBy:" + swordman.getName() + "---" + swordman.getLevel());
}
});
这里创建了《倚天屠龙记》里的8个武侠,对其按照实力等级进行划分,从高到低依次是SS、S、A。使用groupBy可以帮助我们对某一个key值进行
分组,将相同的key值数据排在一起。在这里我们的key值就是武侠的Level(实力等级)。
TAG: groupBy:韦一笑---A
TAG: groupBy:殷梨亭---A
TAG: groupBy:宋青书---A
TAG: groupBy:张三丰---S
TAG: groupBy:张无忌---S
TAG: groupBy:周芷若---S
TAG: groupBy:宋远桥---S
TAG: groupBy:鹤笔翁---S
过滤操作符
过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。
过滤操作符有很多种,这里会介绍filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst和throttleWithTimeOut。
-
filter
filter操作符是对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者,如下所示。
Observable.just(1, 2, 3, 4).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
// 设定大于2的数字会被返回并被提交给订阅者。
return integer > 2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "filter:" + integer);
}
});
输出结果为:
TAG: filter:3
TAG: filter:4
-
elementAt
elementAt操作符用来返回指定位置的数据。和它类似的有elementAtOrDefault(int,T),其可以允许默认值。具体代码如下所示:
Observable.just(1, 2, 3, 4).elementAt(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "elementAt:" + integer);
}
});
输出结果为:
TAG: elementAt:3
-
distinct
distinct 操作符用来去重,其只允许还没有发射过的数据项通过。和它类似的还有distinctUntilChanged操作符,它用来去掉连续重复的数据。
Observable.just(1, 2, 3, 4, 1).distinct().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "distinct:" + integer);
}
});
输出结果为:
TAG: distinct:1
TAG: distinct:2
TAG: distinct:3
TAG: distinct:4
-
skip、take
skip操作符将源Observable发射的数据过滤掉前n项;而take操作符则只取前n项;另外还有skipLast和takeLast操作符,则是从后面进行过滤操作。
先来看skip操作符,如下所示:
Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "skip:" + integer);
}
});
输出结果为:
TAG: skip:3
TAG: skip:4
TAG: skip:5
TAG: skip:6
接下来查看take操作符,如下所示:
Observable.just(1, 2, 3, 4, 5, 6).take(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "take:" + integer);
}
});
输出结果为:
TAG: take:1
TAG: take:2
-
ignoreElements*(疑问)*
ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。
Observable.just(1, 2, 3, 4).ignoreElements().subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError");
}
@Override
public void onNext(Integer integer) {
Log.w("TAG", "onNext");
}
});
输出结果为:
TAG: onCompleted
-
throttleFirst(疑问)
throttleFirst操作符则会定期发射这个时间段里源Observable发射的第一个数据,throttleFirst操作符默认在computation 调度器
上执行。和 throttleFirst 操作符类似的有sample操作符,它会定时地发射源Observable最近发射的数据,其他的都会被过滤掉。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100);
}catch (Exception e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).throttleFirst(200, TimeUnit.MICROSECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "throttleFirst:" + integer);
}
});
每隔100ms发射一个数据。throttleFirst操作符设定的时间为200ms,因此,它会发射每隔200ms内的第一个数据,输出结果为:
TAG: throttleFirst:0
TAG: throttleFirst:1
TAG: throttleFirst:2
TAG: throttleFirst:3
TAG: throttleFirst:4
TAG: throttleFirst:5
TAG: throttleFirst:6
TAG: throttleFirst:7
TAG: throttleFirst:8
TAG: throttleFirst:9
输出结果和书本上的输出结果不一致。
-
throttleWithTimeOut
通过时间来限流。源Observable每次发射出来一个数据后就会进行计时。如果在设定好的时间结束前源Observable有新的数据发射
出来,这个数据就会被丢弃,同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极
端:只会发射最后一个数据。其默认在 computation调度器上执行。和 throttleWithTimeOut 操作符类似的有deounce 操作符,它不仅可
以使用时间来进行过滤,还可以根据一个函数来进行限流。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
int sleep = 100;
if (i % 3 == 0) {
sleep = 300;
}
try {
Thread.sleep(sleep);
}catch (Exception e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "throttleWithTimeout:" + integer);
}
});
每隔100ms发射一个数据。当发射的数据是3的倍数的时候,下一个数据就延迟到300ms再发射。这里设定的过滤时间是200ms,也就是
说发射间隔小于200ms的数据会被过滤掉。
TAG: throttleWithTimeout:0
TAG: throttleWithTimeout:3
TAG: throttleWithTimeout:6
TAG: throttleWithTimeout:9
组合操作符
组合操作符可以同时处理多个Observable来创建我们所需要的Observable。组合操作符有很多,这里介绍startWith、merge、concat、
zip和combineLastest。
-
startWith
startWith操作符会在源Observable发射的数据前面插上一些数据,如下所示:
Observable.just(3, 4, 5).startWith(1, 2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "startWith:" + integer);
}
});
输出结果为:
TAG: startWith:1
TAG: startWith:2
TAG: startWith:3
TAG: startWith:4
TAG: startWith:5
-
merge
merge操作符将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错。
Observable<Integer> observable1 = Observable.just(1, 2, 3).subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(3, 4, 5);
Observable.merge(observable1, observable2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "merge:" + integer);
}
});
输出结果为:
TAG: merge:3
TAG: merge:4
TAG: merge:5
TAG: merge:1
TAG: merge:2
TAG: merge:3
-
concat
将多个 Obserbavle 发射的数据进行合并发射。concat 严格按照顺序发射数据,前一个Observable没发射完成是不会发射后一个Observable的数据。
Observable<Integer> observable1 = Observable.just(1, 2, 3).subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(3, 4, 5);
Observable.concat(observable1, observable2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "concat:" + integer);
}
});
输出结果为:
TAG: concat:1
TAG: concat:2
TAG: concat:3
TAG: concat:3
TAG: concat:4
TAG: concat:5
-
zip
zip操作符合并两个或者多个Observable发射出的数据项,根据指定的函数变换它们,并发射一个新值。
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<String> observable2 = Observable.just("a", "b", "c");
Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG", "zip:" + s);
}
});
输出结果为:
TAG: zip:1a
TAG: zip:2b
TAG: zip:3c
-
combineLastest
当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<String> observable2 = Observable.just("a", "b", "c");
Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.w("TAG", "combineLatest:" + s);
}
});
如果其中的一个 Observable 还有数据没有发射,那么 combineLastest 操作符会将两个Observable最新发射的数据组合在一起。
TAG: combineLatest:3a
TAG: combineLatest:3b
TAG: combineLatest:3c
辅助操作符
辅助操作符可以帮助我们更加方便地处理 Observable。辅助操作符包括有很多。在这里介绍delay、Do、subscribeOn、observeOn和timeout。
-
delay
delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。
Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
Long currentTime = System.currentTimeMillis() / 1000;
subscriber.onNext(currentTime);
}
}).delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.w("TAG", "delay:" + (System.currentTimeMillis() / 1000 - aLong));
}
});
输出结果为:
TAG: delay:2
-
do
Do系列操作符就是为原始Observable的生命周期事件注册一个回调,当Observable的某个事件发生时就会调用这些回调。
RxJava中有很多Do系列操作符,如下所示。
doOnEach:为 Observable注册这样一个回调,当Observable每发射一项数据时就会调用它一次,包括onNext、onError和 onCompleted。
doOnNext:只有执行onNext的时候会被调用。
doOnSubscribe:当观察者订阅Observable时就会被调用。
doOnUnsubscribe:当观察者取消订阅Observable时就会被调用;Observable通过onError或者onCompleted结束时,会取消订阅所有的Subscriber。
doOnCompleted:当Observable 正常终止调用onCompleted时会被调用。
doOnError:当Observable 异常终止调用onError时会被调用。
doOnTerminate:当Observable 终止(无论是正常终止还是异常终止)之前会被调用。
finallyDo:当Observable 终止(无论是正常终止还是异常终止)之后会被调用。
这里拿doOnNext来举例,代码如下所示:
Observable.just(1, 2)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "doOnNext:" + integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.w("TAG", "onNext:" + integer);
}
});
输出结果为:
TAG: doOnNext:1
TAG: onNext:1
TAG: doOnNext:2
TAG: onNext:2
TAG: onCompleted
-
subscribeOn、observeOn
subscribeOn操作符用于指定Observable自身在哪个线程上运行。如果Observable需要执行耗时操作,一般可以让其在新开的一个子线程上运行。
observerOn用来指定Observer所运行的线程,也就是发射出的数据在哪个线程上使用(一般指定UI线程)。
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.w("TAG", "Observable:" + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
});
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "Observer:" + Thread.currentThread().getName());
}
});
subscribeOn(Schedulers.newThread())表示 Observable 运行在新开的线程,observeOn(AndroidSchedulers.mainThread())表示运行在主线程。
TAG: Observable:RxNewThreadScheduler-1
TAG: Observer:main
-
timeout
如果原始 Observable 过了指定的一段时长没有发射任何数据,timeout 操作符会以一个onError通知终止这个Observable,或者继续执行一个备用的
Observable。timeout有很多变体,这里介绍其中的一种:timeout(long,TimeUnit,Observable),它在超时时会切换到使用一个你指定的备用的
Observable,而不是发送错误通知。它默认在computation调度器上执行。
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(i * 100);
}catch (Exception e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(10, 11));
observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "timeout:" + integer);
}
});
如果Observable在200ms这段时长没有发射数据,就会切换到Observable.just(10,11),输出结果为:
TAG: timeout:0
TAG: timeout:1
TAG: timeout:2
TAG: timeout:10
TAG: timeout:11
错误处理操作符
RxJava在错误出现的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错误。但是如果每个 Subscriber 都处理一遍的话,
工作量就有点大了,这时候可以使用错误处理操作符。错误处理操作符有catch和 retry。
-
catch
catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。RxJava将catch
实现为以下 3个不同的操作符。
• onErrorReturn:Observable遇到错误时返回原有Observable行为的备用Observable,原有Observable的onError调用,不会将错误传递给观察者。作为替代,它会发射一个者的onCompleted方法。
• onErrorResumeNext:Observable遇到错误时返回原有Observable行为的备Observable会忽略原有Observable的onError调用,不会将错误传递给观察者。
• onExceptionResumeNext:它和onErrorResumeNext类似。不同的是,如果onError收一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
这里拿onErrorReturn操作符来举例,如下所示:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i > 2) {
subscriber.onError(new Throwable("Throwable"));
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 6;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.w("TAG", "onNext:" + integer);
}
});
输出结果为:
TAG: onNext:0
TAG: onNext:1
TAG: onNext:2
TAG: onNext:6
TAG: onCompleted
-
retry
retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成其数据序列。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i == 1) {
subscriber.onError(new Throwable("Throwable"));
}else {
subscriber.onNext(i);
}
}
subscriber.onCompleted();
}
}).retry(2).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.w("TAG", "onNext:" + integer);
}
});
重新订阅次数为2,输出代码如下:
TAG: onNext:0
TAG: onNext:0
TAG: onNext:0
TAG: onError:Throwable
条件操作符
条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对它们做布尔运算。
-
布尔操作符
布尔操作符有all、contains、isEmpty、exists和sequenceEqual,这里介绍前3个操作符。
(1)all操作符
all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足
我们定义好的判断条件。如果全部都满足则返回true,否则就返回false。
Observable.just(1, 2, 3, 4)
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
Log.w("TAG", "call:" + integer);
return integer < 3;
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(Boolean aBoolean) {
Log.w("TAG", "onNext:" + aBoolean);
}
});
输出结果为:
TAG: call:1
TAG: call:2
TAG: call:3
TAG: onNext:false
TAG: onCompleted
(2)contains 和isEmpty操作符
contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。包含该数据会返回true;否则返回false。
isEmpty操作符用来判断源Observable 是否发射过数据。如果发射过数据,就会返回 false;否则返回true。
Observable.just(1, 2, 3).contains(1).subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.w("TAG", "contains:" + aBoolean);
}
});
Observable.just(1, 2, 3).isEmpty().subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.w("TAG", "isEmpty:" + aBoolean);
}
});
输出结果为:
TAG: contains:true
TAG: isEmpty:false
-
条件操作符
条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil和takeWhile等,这里介绍前两种。
(1)amb操作符
amb 操作符对于给定两个或多个 Observable,它只发射首先发射数据或通知的那个Observable的所有数据。
Observable<Integer> observable1 = Observable.just(1, 2, 3).delay(2, TimeUnit.SECONDS);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable.amb(observable1, observable2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "amb:" + integer);
}
});
第一个Observable延时2s发射,所以很显然最终只会发射第二个Observable,输出结果为:
TAG: amb:4
TAG: amb:5
TAG: amb:6
(2)defaultIfEmpty操作符
发射来自原始Observable的数据。如果原始Observable没有发射数据,就发射一个默认数据,如下所示:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).defaultIfEmpty(3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", "defaultIfEmpty:" + integer);
}
});
可以看出并没有发射数据而是直接调用onCompleted方法来完成事件,输出结果为:
defaultIfEmpty:3
转换操作符
转换操作符用来将 Observable 转换为另一个对象或数据结构,转换操作符有 toList、toSortedList、toMap、toMultiMap、getIterator和nest等,这里介绍前3种。
-
toList
toList操作符将发射多项数据且为每一项数据调用onNext方法的Observable发射的多项数据组合成一个List,然后调用一次onNext方法传递整个列表。
Observable.just(1, 2, 3).toList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (int integer : integers) {
Log.w("TAG", "toList:" + integer);
}
}
});
输出结果为:
TAG: toList:1
TAG: toList:2
TAG: toList:3
-
toSortedList
toSortedList操作符类似于toList操作符;不同的是,它会对产生的列表排序,默认是自然升序。
发射的数据项必须实现Comparable接口,没有则抛出异常,也可以使用toSortedList(Func2)变体,其传递的函数参数Func2会作用于比较两个数据项。
Observable.just(3, 1, 2).toSortedList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (int integer : integers) {
Log.w("TAG", "toSortedList:" + integer);
}
}
});
输出结果为:
TAG: toSortedList:1
TAG: toSortedList:2
TAG: toSortedList:3
-
toMap
toMap操作符收集原始Observable发射的所有数据项到一个Map(默认是HashMap),然后发射这个Map。
Swordsman s1 = new Swordsman("韦一笑", "A");
Swordsman s2 = new Swordsman("张三丰", "SS");
Swordsman s3 = new Swordsman("周芷若", "S");
Observable.just(s1, s2, s3).toMap(new Func1<Swordsman, String>() {
@Override
public String call(Swordsman swordsman) {
return swordsman.getLevel();
}
}).subscribe(new Action1<Map<String, Swordsman>>() {
@Override
public void call(Map<String, Swordsman> stringSwordsmanMap) {
Log.w("TAG", "toMap:" + stringSwordsmanMap.get("SS").getName());
}
});
输出结果为:
TAG: toMap:张三丰
Rxjava线程控制
内置Scheduler
如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的。如果我们想切换线程,就需要使用Scheduler。
RxJava 已经内置了如下5个Scheduler。
Schedulers.immediate():直接在当前线程运行,它是timeout、timeInterval和timestamp操作符的默认调度器。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():IO操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。(内部有无数量上限的线程池,效率比newThread高)
Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler使用固定线程池,大小为 CPU 核数。
Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以用.trampoline()将它入队。
另外,RxAndroid也提供了一个常用的Scheduler:
AndroidSchedulers.mainThread()—RxAndroid库中提供的Scheduler,它指定的操作在主线程中运行。
注意:在RxJava中用subscribeOn和observeOn操作符来控制线程。
// 子线程中发射源数据,在主线程中打印
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.w("TAG", Thread.currentThread().getName() + "--" + integer);
}
});
输出结果如下:
TAG: main--1
TAG: main--2
TAG: main--3
Rxjava使用场景
关于RxJava,实际有很多可以使用它的场景,这里讲解RxJava结合OkHttp、Retrofit访问网络及用RxJava实现RxBus。
1、Rxjava + OkHttp
RxJava结合Retrofit访问网络是比较好的搭配,当然RxJava结合OkHttp访问网络也是可以的。
首先创建Observable,代码如下所示:
// Observable
public Observable<String> getObservable(final String ip) {
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
OkHttpClient okHttpClient = new OkHttpClient();
RequestBody requestBody = new FormBody.Builder()
.add("ip", ip)
.build();
Request request = new Request.Builder()
.url("http://ip.taobao.com/service/getIpInfo.php")
.post(requestBody)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
subscriber.onError(new Exception("error"));
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String str = response.body().string();
// 调用subscriber.onNext来将请求返回的数据添加到事件队列中。
subscriber.onNext(str);
subscriber.onCompleted();
}
});
}
});
return observable;
}
接下来实现观察者,如下所示:
// Observer
private void postAsynHttp(String ip) {
getObservable(ip).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
Log.w("TAG", "onNext:" + s);
Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
}
});
}
Rxjava +Retrofit
-
使用前的准备工作
首先在build.gradle中配置依赖:
implementation 'io.reactivex:rxjava:1.2.0'
implementation 'io.reactivex:rxandroid:1.2.1'
implementation 'com.squareup.retrofit2:retrofit:2.1.0'
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
implementation 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
implementation 'com.squareup.okhttp3:logging-interceptor:3.3.1'
-
修改请求网络接口
Retrofit的请求接口返回的是Call。若结合RxJava使用,则需要把Call改为Observable,代码如下所示:
public interface ApiService {
@FormUrlEncoded
@POST("getIpInfo.php")
Observable<HttpResult<String>> getIpMsg(@Field("ip") String ip);
}
- 修改请求网络方法
public void postIpInformation(String ip) {
String url = "http://ip.taobao.com/service/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(url)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
ApiService apiService = retrofit.create(ApiService.class);
subscribe = apiService.getIpMsg(ip).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<HttpResult<String>>() {
@Override
public void onCompleted() {
Log.w("TAG", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w("TAG", "onError:" + e.getMessage());
}
@Override
public void onNext(HttpResult<String> stringHttpResult) {
Log.w("TAG", "onNext:" + stringHttpResult.getCode());
Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
}
});
}
请求返回数据格式封装如下所示:
public class HttpResult<T> {
private int code;
private T data;
......
}
-
取消请求
如果只使用Retrofit,可以用Call的cancel方法来取消请求。在RxJava中可以使用如下代码:
@Override
protected void onStop() {
super.onStop();
if (subscribe != null && subscribe.isUnsubscribed()) {
subscribe.unsubscribe();
}
}
当然一个界面也可能会用多个请求。如果想取消多个请求,可以使用CompositeSubscription,如下所示:
private CompositeSubscription compositeSubscription = new CompositeSubscription();
...
// 发送请求的地方,将返回的subscript添加到compositeSubscription中
compositeSubscription.add(subscribe);
...
// 取消所有请求操作
@Override
protected void onStop() {
super.onStop();
compositeSubscription.unsubscribe();
}
Rxjava + RxBus
首先创建RxBus。这里的RxBus只是支持基本的功能,如果想要添加一些功能,则可以自定义添加,代码如下所示:
public class RxBus {
private static volatile RxBus instance;
private Subject<Object, Object> bus;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getDefault() {
if (instance == null) {
synchronized (RxBus.class) {
instance = new RxBus();
}
}
return instance;
}
// 发送事件
public void post(Object object) {
bus.onNext(object);
}
// 根据类型接收相应类型事件
public <T> Observable toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
接下来创建RxBus发送对象的实体类:
public class EventBean {
private String name;
private Integer age;
public EventBean(String name, Integer age) {
this.name = name;
this.age = age;
}
......
}
发送事件的方式如下:
mBtnPost = findViewById(R.id.btn_post);
// 发送事件
mBtnPost.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBus.getDefault().post(new EventBean("legend", 20));
}
});
接收事件的方式如下:
// 接收事件
RxBus.getDefault().toObservable(EventBean.class)
.subscribe(new Action1<EventBean>() {
@Override
public void call(EventBean eventBean) {
Toast.makeText(MainActivity.this, "接收成功", Toast.LENGTH_SHORT).show();
}
});