RXJAVA之聚合操作

concat

  按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)。

startWith

  在数据序列的开头增加一项数据。startWith的内部也是调用了concat。

merge

  将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。

reduce

 对发射的数据进行处理,返回最终的处理结果

Observable.just(123, 456,789).reduce(new BiFunction<Integer,Integer,Integer>(){

@Override

public Integer apply(Integer t1, Integer t2) throws Exception {

return t1+t2;

}});

t1为上一次计算的结果,第一个值为第一个发射的数据。t2为新的数据。  

Observable.just("123", "456","789").reduce(123, new BiFunction<Integer,String,Integer>(){

@Override

public Integer apply(Integer t1, String t2) throws Exception {

return t1+Integer.parseInt(t2);

}

});

t1为上一次计算的结果,第一个值为reduce指定的值,t2为新的数据。

原文地址:https://www.cnblogs.com/zhangwanhua/p/7592072.html