rxjava-基本内容解析

2.4 RxJava系列7(最佳实践)
2.3 RxJava系列6(从微观角度解读RxJava源码)
2.2 RxJava系列2(基本概念及使用介绍)
2.1 RxJava系列1(简介)

1.1Rxjava之变换操作符

=========

2.4 RxJava系列7(最佳实践)
2.3 RxJava系列6(从微观角度解读RxJava源码)
2.2 RxJava系列2(基本概念及使用介绍)
2.1 RxJava系列1(简介)

2.4 RxJava系列7(最佳实践)

前言

 因此这篇文章只是简单的通过两个例子介绍了RxJava在生产环境中的使用。不过本篇中的每个例子我都配上了完整的代码。

按照计划这一期是要介绍RxJava框架结构和设计思想的,但是考虑到Netflix将在十月底发布RxJava2.0正式版;因此决定将RxJava框架结构和设计思想分析放到2.0正式版发布后再做。后续我也会有一系列的文章来介绍RxJava1.x和2.x的区别。

示例一、获取手机上已安装的App

第一个例子 这里我主要讲讲如何通过RxJava实现核心功能。

首选我们需要调用系统api来获取所有已安装的app,所以在OnSubscribecall方法中调用getApplicationInfoList()

但是getApplicationInfoList()获取的数据并不能完全满足我们的业务需求:

  1. 由于我们只需要展示手机上已安装的第三方App,因此需要通过filter操作符来过滤掉系统app;
  2. ApplicationInfo并不是我们所需要的类型,因此需要通过map操作符将其转换为AppInfo
  3. 由于获取ApplicationInfo、过滤数据、转换数据相对比较耗时,因此需要通过subscribeOn操作符将这一系列操作放到子线程中来处理;
  4. 而要将信息展示在页面上涉及到UI操作,因此需要通过observeOn操作符将onNextonCompletedonError调度到主线程,接着我们在这些方法中更新UI。

下面是核心代码:

final PackageManager pm = MainActivity.this.getPackageManager();
Observable.create(new Observable.OnSubscribe<ApplicationInfo>() {
        @Override
        public void call(Subscriber<? super ApplicationInfo> subscriber) {
            List<ApplicationInfo> infoList = getApplicationInfoList(pm);
            for (ApplicationInfo info : infoList) {
                subscriber.onNext(info);
            }
            subscriber.onCompleted();
        }
    }).filter(new Func1<ApplicationInfo, Boolean>() {
        @Override
        public Boolean call(ApplicationInfo applicationInfo) {
            return (applicationInfo.flags & ApplicationInfo.FLAG_SYSTEM) <= 0;
        }
    }).map(new Func1<ApplicationInfo, AppInfo>() {

        @Override
        public AppInfo call(ApplicationInfo applicationInfo) {
            AppInfo info = new AppInfo();
            info.setAppIcon(applicationInfo.loadIcon(pm));
            info.setAppName(applicationInfo.loadLabel(pm).toString());
            return info;
        }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<AppInfo>() {
        @Override
        public void onCompleted() {
            mAppListAdapter.notifyDataSetChanged();
            mPullDownSRL.setRefreshing(false);
        }
 

        @Override
        public void onNext(AppInfo appInfo) {
            mAppInfoList.add(appInfo);
        }
    });

示例二、RxJava+Retrofit2实现获取天气数据

RxJava + Retrofit2几乎是Android应用开发的标配了,这个例子中我们就来聊聊这二者是如何配合起来帮助我们快速开发的。

Retrofit2中一个标准的接口定义是这样的:

@GET("weather")
Observable<Weather> getWeather(@Query("cityId") String cityId);

现在有了RxJava,一个基本的网络请求我们便可以这样实现:

ApiClient.weatherService.getWeather(cityId)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Weather>() {
                    @Override
                    public void call(Weather weather) {
                        weatherView.displayWeatherInformation(weather);
                    }
                });

但有时候可能一开始我们并不知道cityId,我们只知道cityName。所以就需要我们先访问服务器,拿到对应城市名的cityId,然后通过这个cityId再去获取天气数据。

同样的,我们需要定义一个获取cityId的接口:

@GET("city")
Observable<String> getCityIdByName(@Query("cityName") String cityName);

紧接着我们便可以使用无所不能的RxJava来实现需求了。

ApiClient.weatherService.getCityIdByName("上海")
             .flatMap(new Func1<String, Observable<Weather>>() {
                 @Override
                 public Observable<Weather> call(String cityId) {
                     return ApiClient.weatherService.getWeather(cityId);
                 }
             }).subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Action1<Weather>() {
                 @Override
                 public void call(Weather weather) {
                     weatherView.displayWeatherInformation(weather);
                 }
             });

WeatherStyle这个项目还在开发中,这个项目不只包含了RxJava和Retrofit的使用,同时还包含MVP、ORMLite、RetroLambda、ButterKnife等等开源库的使用

RxJava1.X的系列文章就到此结束了,由于本人对RxJava的理解有限,这一系列文章中如有错误还请大家指正。在使用RxJava过程中有任何疑问也欢迎大家和我交流。共同学习!共同进步!

好啦,我们RxJava2见!~


 
2.3 RxJava系列6(从微观角度解读RxJava源码)

前言

通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;

所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现:

  • RxJava基本流程分析
  • 操作符原理分析
  • 线程调度原理分析

本章节基于RxJava1.1.9版本的源码

一、RxJava执行流程分析

RxJava系列2(基本概念及使用介绍)中我们介绍过,一个最基本的RxJava调用是这样的:

示例A

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参;

接着创建一个观察者Subscriber,然后通过subseribe()实现二者的订阅关系。 这里涉及到三个关键对象和一个核心的方法:

  • Observable(被观察者)
  • OnSubscribe (从纯设计模式的角度来理解,OnSubscribe.call()可以看做是观察者模式中被观察者用来通知观察者的 notifyObservers()方法)
  • Subscriber (观察者)
  • subscribe() (实现观察者与被观察者订阅关系的方法)

1、Observable.create()源码分析

首先我们来看看Observable.create()的实现:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

这里创建了一个被观察者Observable,同时将RxJavaHooks.onCreate(f)作为构造函数的参数,源码如下:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

我们看到源码中直接将参数RxJavaHooks.onCreate(f)赋值给了当前我们构造的被观察者Observable的成员变量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我们接着往下看:

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

由于我们并没调用RxJavaHooks.initCreate(),所以上面代码中的onObservableCreate为null;因此RxJavaHooks.onCreate(f)最终返回的就是f,也就是我们在Observable.create()的时候new出来的OnSubscribe。(由于对RxJavaHooks的理解并不影响我们对RxJava执行流程的分析,因此在这里我们不做进一步的探讨。为了方便理解我们只需要知道RxJavaHooks一系列方法的返回值就是入参本身就OK了,例如这里的RxJavaHooks.onCreate(f)返回的就是f)。

至此我们做下逻辑梳理:Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe

2、Subscriber源码分析

接着我们看下观察者Subscriber的源码,为了增加可读性,我去掉了源码中的注释和部分代码。

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    private final SubscriptionList subscriptions;//订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
    
    ...

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }
    
    ...
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

Subscriber实现了Subscription接口,从而对外提供isUnsubscribed()unsubscribe()方法。前者用于判断是否已经取消订阅;后者用于将订阅事件列表(也就是当前观察者的成员变量subscriptions)中的所有Subscription取消订阅,并且不再接受观察者Observable发送的后续事件。

3、subscribe()源码分析

前面我们分析了观察者和被观察者相关的源码,那么接下来便是整个订阅流程中最最关键的环节了。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    ...

    subscriber.onStart();
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

subscribe()方法中将传进来的subscriber包装成了SafeSubscriberSafeSubscriber其实是subscriber的一个代理,对subscriber的一系列方法做了更加严格的安全校验。保证了onCompleted()onError()只会有一个被执行且只执行一次,一旦它们其中方法被执行过后onNext()就不在执行了。

上述代码中最关键的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。这里的RxJavaHooks和之前提到的一样,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二个入参observable.onSubscribe,也就是当前observable的成员变量onSubscribe。而这个成员变量我们前面提到过,它是我们在Observable.create()的时候new出来的。所以这段代码可以简化为onSubscribe.call(subscriber)。这也印证了我在RxJava系列2(基本概念及使用介绍)中说的,onSubscribe.call(subscriber)中的subscriber正是我们在subscribe()方法中new出来的观察者。

到这里,我们对RxJava的执行流程做个总结:首先我们调用crate()创建一个观察者,同时创建一个OnSubscribe作为该方法的入参;接着调用subscribe()来订阅我们自己创建的观察者Subscriber
一旦调用subscribe()方法后就会触发执行OnSubscribe.call()。然后我们就可以在call方法调用观察者subscriberonNext(),onCompleted(),onError()

最后我用张图来总结下之前的分析结果:

 
RxJava基本流程分析

二、操作符原理分析

之前我们介绍过几十个操作符,要一一分析它们的源码显然不太现实。在这里我抛砖引玉,选取一个相对简单且常用的map操作符来分析。

我们先来看一个map操作符的简单应用:

示例B

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "This is " + integer;
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

为了便于表述,我将上面的代码做了如下拆解:

Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

Subscriber<String> subscriberOne = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

Observable<String> observableB = 
        observableA.map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "This is " + integer;;
                }
            });

observableB.subscribe(subscriberOne);

map()的源码和上一小节介绍的create()一样位于Observable这个类中。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

通过查看源码我们发现调用map()的时候实际上是创建了一个新的被观察者Observable,我们姑且称它为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。在创建ObservableB的时候同时创建了一个OnSubscribeMap,而ObservableA和变换函数Func1则作为构造OnSubscribeMap的参数。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;//ObservableA
    
    final Func1<? super T, ? extends R> transformer;//map操作符中的转换函数Func1。T为转换前的数据类型,在上面的例子中为Integer;R为转换后的数据类型,在该例中为String。

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber<? super R> o) {//结合第一小节的分析结果,我们知道这里的入参o其实就是我们自己new的观察者subscriberOne。
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;//这里的actual就是我们在调用subscribe()时创建的观察者mSubscriber
        final Func1<? super T, ? extends R> mapper;//变换函数
        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }
        
        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

OnSubscribeMap实现了OnSubscribe接口,因此OnSubscribeMap就是一个OnSubscribe

在调用map()的时候创建了一个新的被观察者ObservableB,然后我们用ObservableB.subscribe(subscriberOne)订阅了观察者subscriberOne。结合我们在第一小节的分析结果,所以OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦调用了ObservableB.subscribe(subscriberOne)就会执行OnSubscribeMap.call()

call()方法中,首先通过我们的观察者o和转换函数transformer构造了一个MapSubscriber

最后调用了source也就是observableAunsafeSubscribe()方法。即observableA订阅了一个观察者MapSubscriber

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        ...
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上面这段代码最终执行了onSubscribe也就是OnSubscribeMapcall()方法,call()方法中的参数就是之前在OnSubscribeMap.call()中new出来的MapSubscriber。最后在call()方法中执行了我们自己的业务代码:

subscriber.onNext(1);
subscriber.onCompleted();

其实也就是执行了MapSubscriberonNext()onCompleted()

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        ...
        return;
    }
    actual.onNext(result);
}

onNext(T t)方法中的的mapper就是变换函数,actual就是我们在调用subscribe()时创建的观察者subscriberOne。这个T就是我们例子中的IntegerR就是String。在onNext()中首先调用变换函数mapper.call()T转换成R(在我们的例子中就是将Integer类型的1转换成了String类型的“This is 1”);接着调用subscriberOne.onNext(String result)。同样在调用MapSubscriber.onCompleted()时会执行subscriberOne.onCompleted()。这样就完成了一直完成的调用流程。

我承认太啰嗦了,花费了这么大的篇幅才将map()的转换原理解释清楚。我也是希望尽量的将每个细节都呈现出来方便大家理解,如果看我啰嗦了这么久还是没能理解,请看下面我画的这张执行流程图。

 
加入Map操作符后的执行流程

三、线程调度原理分析

在前面的文章中我介绍过RxJava可以很方便的通过subscribeOn()observeOn()来指定数据流的每一部分运行在哪个线程。其中subscribeOn()指定了处理Observable的全部的过程(包括发射数据和通知)的线程;observeOn()指定了观察者的onNext(), onError()onCompleted()执行的线程。接下来我们就分析分析源码,看看线程调度是如何实现的。

在分析源码前我们先看看一段常见的通过RxJava实现的线程调度代码:

示例C

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

1、subscribeOn()源码分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

通过上面的代码我们可以看到,subscribeOn()map()一样是创建了一个新的被观察者Observable。因此我大致就能猜到subscribeOn()的执行流程应该和map()差不多,OperatorSubscribeOn肯定也是一个OnSubscribe。那我们接下来就看看OperatorSubscribeOn的源码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;//线程调度器,用来指定订阅事件发送、处理等所在的线程
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn实现了OnSubscribe接口,call()中对Subscriber的处理也和OperatorMapSubscriber的处理类似。首先通过scheduler构建了一个Worker;然后用传进来的subscriber构造了一个新的Subscriber s,并将s丢到Worker.schedule()中来处理;最后用原Observable去订阅观察者s。而这个Worker就是线程调度的关键!前面的例子中我们通过subscribeOn(Schedulers.io())指定了Observable发射处理事件以及通知观察者的一系列操作的执行线程,正是通过这个Schedulers.io()创建了我们前面提到的Worker。所以我们来看看Schedulers.io()的实现。

首先通过Schedulers.io()获得了ioScheduler并返回,上面的OperatorSubscribeOn通过这个的SchedulercreateWorker()方法创建了我们前面提到的Worker

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

接着我们看看这个ioScheduler是怎么来的,下面的代码向我们展现了是如何在Schedulers的构造函数中通过RxJavaSchedulersHook.createIoScheduler()来初始化ioScheduler的。

private Schedulers() {

    ...

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    ...
}

最终RxJavaSchedulersHook.createIoScheduler()返回了一个CachedThreadScheduler,并赋值给了ioScheduler

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    ...
    return new CachedThreadScheduler(threadFactory);
}

到这一步既然我们知道了ioScheduler就是一个CachedThreadScheduler,那我们就来看看它的createWorker()的实现。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

上面的代码向我们赤裸裸的呈现了前面OperatorSubscribeOn中的Worker其实就是EventLoopWorker。我们重点要关注的是他的scheduleActual()

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    private final CompositeSubscription innerSubscription = new CompositeSubscription();
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    final AtomicBoolean once;

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.once = new AtomicBoolean();
        this.threadWorker = pool.get();
    }

    ...

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        ...
        ScheduledAction s = threadWorker.scheduleActual(new Action0() {
            @Override
            public void call() {
                if (isUnsubscribed()) {
                    return;
                }
                action.call();
            }
        }, delayTime, unit);
        innerSubscription.add(s);
        s.addParent(innerSubscription);
        return s;
    }
}

通过对源码的一步步追踪,我们知道了前面OperatorSubscribeOn.call()中的inner.schedule()最终会执行到ThreadWorkerscheduleActual()方法。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
}

scheduleActual()中的ScheduledAction实现了Runnable接口,通过线程池executor最终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

2、observeOn()源码分析

observeOn()切换线程是通过lift来实现的,相比subscribeOn()在实现原理上相对复杂些。不过本质上最终还是创建了一个新的Observable

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OperatorObserveOn作为OnSubscribeLift构造函数的参数用来创建了一个新的OnSubscribeLift对象,接下来我们看看OnSubscribeLift的实现:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

OnSubscribeLift继承自OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)

我们再看看OperatorObserveOn.call()的实现:

public Subscriber<? super T> call(Subscriber<? super T> child) {
    ...
    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
}

OperatorObserveOn.call()中创建了一个ObserveOnSubscriber并调用init()进行了初始化。

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

    ...

    @Override
    public void onNext(final T t) {
        ...
        schedule();
    }

    @Override
    public void onCompleted() {
        ...
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        ...
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        final NotificationLite<T> localOn = this.on;
        
        for (;;) {
            long requestAmount = requested.get();
            
            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;
                
                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                localChild.onNext(localOn.getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }
            
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
    
    ...
}

ObserveOnSubscriber继承自Subscriber,并实现了Action0接口。我们看到ObserveOnSubscriberonNext()onCompleted()onError()都有个schedule(),这个方法就是我们线程调度的关键;通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件都切换到了recursiveScheduler所对应的线程,简单的说就是把subscriberOneonNext()onCompleted()onError()方法丢到了recursiveScheduler对应的线程中来执行。

那么schedule()又是如何做到这一点的呢?他内部调用了recursiveScheduler.schedule(this)recursiveScheduler其实就是一个Worker,和我们在介绍subscribeOn()时提到的worker一样,执行schedule()实际上最终是创建了一个runable,然后把这个runnable丢到了特定的线程池中去执行。在runnablerun()方法中调用了ObserveOnSubscriber.call(),看上面的代码大家就会发现在call()方法中最终调用了subscriberOneonNext()onCompleted()onError()方法。这便是它实现线程切换的原理。

好了,我们最后再看看示例C对应的执行流程图,帮助大家加深理解。

 
RxJava执行流程

总结

这一章以执行流程操作符实现以及线程调度三个方面为切入点剖析了RxJava源码。下一章将站在更宏观的角度来分析整个RxJava的框架结构、设计思想等等。敬请期待~~ :)

 
 
2.2 RxJava系列2(基本概念及使用介绍)

前言

上一篇的示例代码中大家一定发现了Observable这个类。从纯Java的观点看,Observable类源自于经典的观察者模式。

RxJava的异步实现正是基于 观察者模式来实现的,而且是一种扩展的观察者模式。

观察者模式

  观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。

观察者模式很适合下面这些场景中的任何一个:

  1. 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响 或者是独立复用它们时。
  2. 当一个变化的对象,通知那些与它自身变化相关联的未知数量的对象时。
  3. 当一个变化的对象通知,那些无需推断具体类型的对象时。

通常一个观察者模式的类图是这样的:

 
Observer

如果你对观察者模式不是很了解,那么强烈建议你先去学习下。关于观察者模式的详细介绍可以参考我之前的文章:设计模式之观察者模式

扩展的观察者模式

在RxJava中主要有4个角色:

  • Observable
  • Subject
  • Observer
  • Subscriber

      Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而ObserverSubscriber对应于观察者模式中的观察者Subscriber其实是一个实现了Observer的抽象类,

后面我们分析源码的时候也会介绍到。Subject比较复杂,以后再分析。

  上一篇文章中我们说到RxJava中有个关键概念:事件。观察者Observer和被观察者Observable通过subscribe()方法实现订阅关系。从而Observable 可以在需要的时候发出事件来通知Observer

RxJava如何使用

我自己在学习一种新技术的时候通常喜欢先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我们现在就来说说RxJava到底该怎么用。

第一步:创建观察者Observer

Observer<Object> observer = new Observer<Object>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
 };

这么简单,一个观察者Observer创建了!

大兄弟你等等...,你之前那篇观察者模式中不是说观察者只提供一个update方法的吗? 这特么怎么有三个?!!

 在普通的观察者模式中观察者一般只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。

而在RxJava中的观察者Observer提供了:onNext()onCompleted()onError()三个方法。还记得吗?开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就相当于普通观察者模式中的update

RxJava中添加了普通观察者模式缺失的三个功能:

  1. RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;
  2. 当事件处理出现异常时框架自动触发onError()方法;
  3. 同时Observables支持链式调用,从而避免了回调嵌套的问题。

第二步:创建被观察者Observable

   Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。

当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。

Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

除了create(),just()和from()同样可以创建Observable。看看下面两个例子:

just(T...)将传入的参数依次发送

Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第三步:被观察者Observable订阅观察者Observerps:你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者

有了观察者和被观察者,我们就可以通过subscribe()来实现二者的订阅关系了。

observable.subscribe(observer);
 
observable.subscribe(observer)

连在一起写就是这样:

Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

至此一个完整的RxJava调用就完成了。

兄台,你叨逼叨叨逼叨的说了一大堆,可是我没搞定你特么到底在干啥啊?!!不急,我现在就来告诉你们到底发生了什么。

首先我们使用Observable.create()创建了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,

一旦我们调用subscribe()订阅后就会自动触发call()方法。call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。

我们在call()方法中调用了5次onNext()和1次onCompleted()方法。一套流程周下来以后输出结果就是下面这样的:

Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted

看到这里可能你又要说了,大兄弟你别唬我啊!OnSubscribe的call()方法中的参数Subscriber怎么就变成了subscribe()方法中的观察者Observer?!!!这俩儿货明明看起来就是两个不同的类啊。

我们先看看Subscriber这个类:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    ...
}

从源码中我们可以看到,Subscriber是Observer的一个抽象实现类,所以我首先可以肯定的是Subscriber和Observer类型是一致的。

接着往下我们看看subscribe()这个方法:

public final Subscription subscribe(final Observer<? super T> observer) {

    //这里的if判断对于我们要分享的问题没有关联,可以先无视
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}

我们看到subscribe()方法内部首先将传进来的Observer做了一层代理,将它转换成了Subscriber。我们再看看这个方法内部的subscribe()方法:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

进一步往下追踪看看return后面这段代码到底做了什么。精简掉其他无关代码后的subscribe(subscriber, this)方法是这样的:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    subscriber.onStart();
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        return Subscriptions.unsubscribed();
    }
}

我们重点看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),

前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括号内的第二个参数observable.onSubscribe,

然后调用了它的call方法。 而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。

看到这里是不是对RxJava的执行流程清晰了一点呢?这里也建议大家在学习新技术的时候多去翻一翻源码,知其然还要能知其所以然不是吗。

subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1、Action0; 这是一种更简单的回调,只有一个call(T)方法;由于太简单这里就不做详细介绍了!

异步

上一篇文章中开篇就讲到RxJava就是来处理异步任务的。但是默认情况下我们在哪个线程调用subscribe()就在哪个线程生产事件,

在哪个线程生产事件就在哪个线程消费事件。那怎么做到异步呢?

RxJava为我们 提供Scheduler用来做线程调度,我们来看看RxJava提供了哪些Scheduler。


 
 

同时RxJava还为我们提供了subscribeOn()observeOn()两个方法来指定Observable和Observer运行的线程。

Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

subscribeOn(Schedulers.io())指定了获取小区列表、处理房源信息等一系列事件都是在IO线程中运行,

observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI线程执行。这就做到了在子线程获取房源,主线程展示房源。

好了,RxJava系列的入门内容我们就聊到这。下一篇我们再继续介绍更多的API以及它们内部的原理。

 

2.1 RxJava系列1(简介)

提升开发效率,降低维护成本一直是开发团队永恒不变的宗旨。 经过一段时间的学习和探索之后我也深深的感受到了RxJava的魅力。

它能帮助我们简化代码逻辑,提升代码可读性。这对于开发效率的提升、后期维护成本的降低帮助都是巨大的。

个人预测RxJava一定是2016年的一个大趋势,所以也有打算将它引入到公司现有的项目中来,写这一系列的文章主要也是为了团队内部做技术分享。

响应式编程

在介绍RxJava前,我们先聊聊响应式编程。那么什么是响应式编程呢?

   响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。 事件可以被等待,可以触发过程,也可以触发其它事件。

事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。

今天,响应式编程最通用的一个场景是UI:我们的移动App必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。

RxJava的来历

Rx是微软.Net的一个响应式扩展,Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。2012年Netflix为了应对不断增长的业务需求开始将.NET Rx迁移到JVM上面。并于13年二月份正式向外展示了RxJava。
从语义的角度来看,RxJava就是.NET Rx。从语法的角度来看,Netflix考虑到了对应每个Rx方法,保留了Java代码规范和基本的模式。

 
RxJava来历

什么是RxJava

那么到底什么是RxJava呢?我对它的定义是:RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。

RxJava好在哪

Android平台上为已经开发者提供了AsyncTask,Handler等用来做异步操作的类库,那我们为什么还要选择RxJava呢?

答案是简洁!RxJava可以用非常简洁的代码逻辑来解决复杂问题;而且即使业务逻辑的越来越复杂,它依然能够保持简洁!再配合上Lambda用简单的几行代码分分钟就解决你负责的业务问题。简直逼格爆表,拿它装逼那是极好的!

多说无益,上代码!

假设我们安居客用户App上有个需求,需要从服务端拉取上海浦东新区塘桥板块的所有小区Community[] communities,每个小区下包含多套房源List<House> houses;我们需要把塘桥板块的所有总价大于500W的房源都展示在App的房源列表页。用于从服务端拉取communities需要发起网络请求,比较耗时,因此需要在后台运行。而这些房源信息需要展示到App的页面上,因此需要在UI线程上执行。(此例子思路来源于扔物线的给Android开发者的RxJava详解一文)

new Thread() {
        @Override
        public void run() {
            super.run();
            //从服务端获取小区列表
            List<Community> communities = getCommunitiesFromServer();
            for (Community community : communities) {
                List<House> houses = community.houses;
                for (House house : houses) {
                    if (house.price >= 5000000) {
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                //将房子的信息添加到屏幕上
                                addHouseInformationToScreen(house);
                            }
                        });
                    }
                }
            }
        }
    }.start();

使用RxJava的写法是这样的:

Observable.from(getCommunitiesFromServer())
            .flatMap(new Func1<Community, Observable<House>>() {
                @Override
                public Observable<House> call(Community community) {
                    return Observable.from(community.houses);
                }
            }).filter(new Func1<House, Boolean>() {
                @Override
                public Boolean call(House house) {
                    return house.price>=5000000;
                }
            }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<House>() {
                @Override
                public void call(House house) {
                    //将房子的信息添加到屏幕上
                    addHouseInformationToScreen(house);
                }
            });

从上面这段代码我们可以看到:虽然代码量看起来变复杂了,但是RxJava的实现是一条链式调用,没有任何的嵌套;

整个实现逻辑看起来异常简洁清晰,这对我们的编程实现和后期维护是有巨大帮助的。特别是对于那些回调嵌套的场景。配合Lambda表达式还可以简化成这样:

  Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

简洁!有美感!这才是一个有情怀的程序员应该写出来的代码。

看完这篇文章大家应该能够理解RxJava为什么会越来越火了。它能极大的提高我们的开发效率和代码的可读性!

当然了RxJava的学习曲线也是比较陡的,在后面的文章我会对主要的知识点做详细的介绍,敬请关注!



 

1.1. Rxjava之变换操作符

Rxjava常见的变化操作符如下: 

1.map()变换符:转换操作符之map()

//通过map()操作符,对被观察者发送的每一个事件都,通过指定的Function对象的apply()进行转换处理
//将之前的事件类型转换成为 另外的一种事件类型。
//即通过map()操作符将我们的被观察者发送的事件转换成为 任意的类型的事件
//应用场景:数据类型的转换。

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
    //通过apply将事件的类型有integer转换成为了String
return String.valueOf("通过map()变换操作符将int类型的数据转换成为了String字符串类型的数据 : " + integer.intValue());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: thread = " + Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
//最终在onNext中调用得到的是String类型的事件
Log.d(TAG, "onNext: " + s + " thread = " + Thread.currentThread().getName());
}
});
打印结果:

2.flatMap()变换操作符:转换操作符之flatMap()

//flatMap()操作符的作用是将被观察者发送的事件序列进行拆分 & 单独转换 在合并成为一个新的事件序列,最后在进行发送。
//原理:将被观察者发送的事件序列进行拆分成一个个事件 在将每个事件都生成创建一个新的Observable对象。
//每个原始事件都会生成一个新的Observable对象;
//每个拆分的新的事件生成的新的Observable对象,最终会汇总到一个新建总的Observable对象中。
//新建的总的Observable对象最终将新合并的事件序列,发送给观察者Observer。

//应用场景:无序的将整个被观察者,发送的事件序列进行变换
//注意:新生成的事件序列的顺序是无序的 和旧的事件序列的顺序无关


Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
ArrayList<String> transforDatas = new ArrayList<>();
for (int j = 0; j < 3; j++) {
Log.d(TAG, "apply: 被观察者的原始事件 " + integer + " 分解成的子事件 " + j + " thread : " + Thread.currentThread().getName());
}

return Observable.fromIterable(transforDatas);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: thread = " + Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s + " thread = " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: thread = " + Thread.currentThread().getName());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});

3.concatMap():变换操作符。

//转换操作符之concatMap()
final String[] courseName = {"语文","数学","英语"};
ArrayList<Student> students = new ArrayList<>();
for (int j = 0; j < 5; j++) {
List<Course> courses = new ArrayList<>();
for (int k = 0; k < 3; k++) {
courses.add(new Course(courseName[k] + k, (int) (Math.random() * 31 + 70)));
}
Student childStudent = new Student("何乐" + j, 22 + j, courses);
students.add(childStudent);
}

Observable.fromIterable(students).subscribe(new Consumer<Student>() {
@Override
public void accept(@NonNull Student student) throws Exception {
List<Course> courses = student.getCourses();
for (int j = 0; j < courses.size(); j++) {
Log.d(TAG, "accept: 学生姓名: " + student.getName() + " 学生年龄 : " + student.getAge() + " 学生学习课程名: "

+ courses.get(j).getCourseName() + " 学生该门成绩: " + courses.get(j).getScore());
}
}
});

Log.d(TAG, "------------------------------------------------------------------------------------------------");
//使用flatMap() / concatMap()直接获取Course对象,不用进行频繁的转换
//将被观察者发送的原始事件进行拆分 生成一个新的事件序列。

//concatMap()和flatMap()之间的不同点
//concatMap() 拆分 & 重新合并生成的事件序列的顺序和 旧的的事件序列的顺序是一致的。
//即新生成的事件序列的顺序和严格按照旧的事件序列的顺序来发射
Observable.fromIterable(students).concatMap(new Function<Student, ObservableSource<Course>>() {
@Override
public ObservableSource<Course> apply(@NonNull Student student) throws Exception {
return Observable.fromIterable(student.getCourses());
}
}).subscribe(new Consumer<Course>() {
@Override
public void accept(@NonNull Course course) throws Exception {
Log.d(TAG, "accept: 课程名 " + course.getCourseName() + " 课程成绩 " + course.getScore());
}
});

4. switchMap()转换操作符

//转换操作符之switchMap()
//switchMap()操作符和flatMap()操作符的功能很像,除了一点
//对于switchMap()每当源Observable发射一个新的数据项,它就取消订阅并停止监视之前的那个数据项
//变换产生的Observable,并开始监视当前最新发射的这个数据项

//使用测试调度器
final TestScheduler testScheduler = new TestScheduler();

Observable.just(10,20,30)
.switchMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
int delay = 0;
// if(integer == 10){
// delay = 100;
// }
//
// if(integer == 20){
// delay = 50;
// }
//
// if(integer == 30){
// delay = 30;
// }

//生成一个随机产生的延迟值,来延迟变换三个上游发射的三个数据项
delay = new Random().nextInt(101);
return Observable.fromArray(new Integer[]{integer,integer / 2}).delay(delay,TimeUnit.MILLISECONDS,testScheduler);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer + " " + Thread.currentThread().getName());
}
});

//使用测试调度器直接将时间向前推进到1分钟的时间
testScheduler.advanceTimeBy(1,TimeUnit.MINUTES);
打印项:

 以上SwitchMap代码不管运行多少次打印的结果始终是30 15 不管10 20 30 这三个数据项在switchMap()中变换时生成时间的先后 说明对于switchMap()只要上游源Observable发射了新的数据就会停止订阅和监视之前的旧的数据项的变换产生的Observable,同时会直接订阅监视最新的这个数据项

5. flatMapIterable()转换操作符

//变换操作符之flatMapIterable()
//flatMapIterable()和flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。
ArrayList<Integer> list1 = new ArrayList<>();
ArrayList<Integer> list2 = new ArrayList<>();
ArrayList<Integer> list3 = new ArrayList<>();
for(int i = 0;i < 10;i++){
list1.add(i);
list2.add(i);
list3.add(i);
}

Observable.just(list1,list2,list3)
.flatMapIterable(new Function<ArrayList<Integer>, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(@NonNull ArrayList<Integer> integers) throws Exception {
return integers;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
 

concatMapEager():变换操作符:

//变换操作符之concatMapEager()
//和flatMap()不同concatMap() concatMapEager()操作符能够保证最终变换生成的数据是有序的
Observable.just(1,2,3,4,5,6)
.concatMapEager(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just(String.valueOf(integer));
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});


6. buffer()转换操作符:

 //转换操作符之buffer()缓冲
//buffer()变换操作符可以Observable发射的事件序列进行缓冲转换成一个新的Observable对象
//而且新转换的Observable发射事件时是发射一组列表值而不是一个个发射
//buffer(count) 每次新生成的Observable事件序列中的事件数量为count个
Observable.just(1,2,3,4,5).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
Log.d(TAG, "缓冲区数据");
for (int j = 0; j < integers.size(); j++) {
Log.d(TAG, "accept: " + integers.get(j));
}
}
});

Log.d(TAG, "--------------------------------------");
//buffer(count,skip) 每次新转换的Observable事件序列中的事件数量为count
//并且每次需要跳skip个事件数量的位置重新生成count个事件的新的Observable事件序列
Observable.just(1,2,3,4,5).buffer(3,2).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
Log.d(TAG, "缓冲区数据");
for (int j = 0; j < integers.size(); j++) {
Log.d(TAG, "accept: " + integers.get(j));
}
}
});
打印结果:

  

7. scan()操作符:  Scan()变换操作符

//Scan操作符对原始Observable发射的第一项数据应用一个函数,
// 然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。
// 它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator 累加器


Observable.just(1,2,3,4,5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
Log.d(TAG, "integer : " + integer + " integer2 : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Log.d(TAG, "-----------------------------------------------------");

//scan操作符的变体,你可以传递一个种子值给累加器函数的第一次调用(Observable发射的第一项数据)
// 如果你使用这个版本,scan将发射种子值作为自己的第一项数据。 下游会比源Observable发射的数据项多接收响应一次数据多的就时初始种子值
Observable.just(1,2,3)
.scan(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
Log.d(TAG, "intgera : " + integer + " integerb : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
   

8. groupBy()操作符: //变换操作符groupBy()

//GroupBy操作符将原始Observable分拆为一些Observables集合,
// 它们中的每一个发射原始Observable数据序列的一个子序列。
// 哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
//RxJava实现了groupBy操作符。它返回Observable的一个特殊子类GroupedObservable,
//实现了GroupedObservable接口的对象有一个额外的方法 getKey,这个Key用于将数据分组到指定的Observable。

//注意:groupBy 将原始Observable分解为 一个发射多个GroupedObservable的Observable,一旦有订阅,
//每个GroupedObservable 就开始 缓存数据。 因此,如果你忽略这些GroupedObservable中的任何一个,
//这个缓存可能形成一个潜在的内存泄露。 因此,如果你不想观察,也不要忽略GroupedObservable。
//你应该使用像take(0)这样会丢弃自己的缓存的操作符。
//如果你取消订阅一个GroupedObservable,那个Observable将会终止。
//如果之后原始的Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个Key创建一个新的GroupedObservable。
//groupBy默认不在任何特定的调度器上执行。

Observable.range(1,10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return (integer >= 5) ? "大于5" : "小于5";
}
})/*.elementAt(1)*/ //只获取索引为1的那个位置的元素(默认索引从0开始)
.take(1) //只获取事件序列中的前一个事件和element()是不同的意思
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
Log.d(TAG, "accept: key : " + key + " groupObservable : " + stringIntegerGroupedObservable);

stringIntegerGroupedObservable
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.d(TAG, "accept: " + throwable.getMessage());
}
});

//传递一个变换函数,这样它可以在发射结果GroupedObservable之前改变数据项。
Observable.range(1,10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer % 2 == 0 ? "偶数" : "奇数";
}
}, new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer + " x");
}
}).subscribe(new Consumer<GroupedObservable<String, String>>() {
@Override
public void accept(@NonNull GroupedObservable<String, String> stringStringGroupedObservable) throws Exception {
Log.d(TAG, "accept: " + stringStringGroupedObservable.getKey());
if(stringStringGroupedObservable.getKey().equals("奇数")){
stringStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
}
});
打印结果:

9. window操作符:

//变换操作符window()
//window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,
// 而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理
//https://mcxiaoke.gitbooks.io/rxdocs/operators/Window.html
Observable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: thread : " + Thread.currentThread().getName());
}

@Override
public void onNext(@NonNull Observable<Integer> integerObservable) {
integerObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: inner thread : " + Thread.currentThread().getName());
}

@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: inner : " + integer + "thread : " + Thread.currentThread().getName());
}
});
}
}); 

  

原文地址:https://www.cnblogs.com/awkflf11/p/12528561.html