RXJava2响应式编程框架设计<一>---Rxjava2入门

关于Rxjava在之前https://www.cnblogs.com/webor2006/p/10545699.html已经对它的原理啥的有了一定的研究了,这次准备再对它进行进一步深入的研究,这里会从一个基础到原理的完整流程中重新审视它,并且最终来手写一下整个Rxjava框架的核心框架,进一步加深对该框架的理解。

前言:

什么是RxJava?

这其实是费话,基本上只要从事Android开发的人基本上都知道,不过还是上官网瞅一下它的定义:https://github.com/ReactiveX/RxJava

言外之意就是说它是一个实现异步操作的库。 

RxJava好在哪?

RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式【它是定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则所有依赖于它的对象都会得到通知并被自动更新】的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。关于平常我们处理异步的代码一般都会放到Thread或者AsyncTask当中,但是使用传纺的方式嵌套比较深,而且可读性也不是很好,而如果改用RxJava的风格,则就跟那个Builder模式一样一路链式顺下来,但是呢RxJava有一定的学习成本,如果第一次接触它还是会有很多不适应的,但是适应了之后关于异步风格的代码用它来改写还是相当不错的,我实际项目中用得也不是特别的多,学习好它之后希望在未来的工作中能尽量来多使用使用它。

适用场景: 

数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现。 

Rxjava2基本概念:

三个基本元素:

下面贴一个能体现上面三个元素的RxJava的代码,暂且先不用理会具体的含义:

原理剖析:

上面这三个元素其实就组成了观察者模式的一个结构,下面借用博主的这篇https://www.jianshu.com/p/a51aa39c30ab里所举的关于顾客在饭店就餐的生动例子来理解一下这三个元素之间的关系,对于就餐的整个流程可以用状态图描述如下:

而用流程图来表示则为:

而对于RxJava而言它是一种扩展的观察者模式,它有如下4个角色:

那,上面这四个角色在上面这个生活化的例子都是如何扮演的呢?下面看一下(看标红处):

总结一下其实就是:

RxJava2基本使用:

下面以上面阐述原理用到的顾客就餐的生活场景来使用一下RxJava2,首先肯定得要先来导下它的依赖包,RxJava目前最新有3版本了:

这里我们学习还是用RxJava2.x版本,毕境真实商用项目用这个版本的还是占大多数,另外RxJava3.x肯定是基于RxJava2.x进行升华的,所以从学习的角度先掌握2.x是顺其自然的事,这里咱们准备用"2.1.14"版本,如下:

然后对于RxJava的使用方式有两种,如博主的描述:

而实际用的实现一般都会采用第二种,链式调用,不过这里学习两种都来搞一下:

方式1:分步骤实现

先来看一下整体的编写步骤:

步骤1:创建被观察者 (Observable )& 生产事件

即 顾客入饭店 - 坐下餐桌 - 点菜,实现如下:

public class ExampleUnitTest {
    @Test
    public void test() {
        // 1. 创建被观察者 Observable 对象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // create() 是 RxJava 最基本的创造事件序列的方法
            // 此处传入了一个 OnSubscribe 对象参数
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

            // 2. 在复写的subscribe()里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通过 ObservableEmitter类对象产生事件并通知观察者
                // ObservableEmitter类介绍
                // a. 定义:事件发射器
                // b. 作用:定义需要发送的事件 & 向观察者发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });

    }
}

对于具体的原理啥的都不用管,目前先对其有个感性的认识既可。

步骤2:创建观察者 (Observer )并 定义响应事件的行为

即 开厨房 - 确定对应菜式,发生的事件类型包括:Next事件、Complete事件 & Error事件。具体如下:

具体实现:

public class ExampleUnitTest {
    @Test
    public void test() {
        // 1. 创建被观察者 Observable 对象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // create() 是 RxJava 最基本的创造事件序列的方法
            // 此处传入了一个 OnSubscribe 对象参数
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

            // 2. 在复写的subscribe()里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通过 ObservableEmitter类对象产生事件并通知观察者
                // ObservableEmitter类介绍
                // a. 定义:事件发射器
                // b. 作用:定义需要发送的事件 & 向观察者发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });


        // 3. 创建观察者 (Observer )对象
        Observer<Integer> observer = new Observer<Integer>() {
            // 4. 创建对象时通过对应复写对应事件方法 从而 响应对应事件

            // 观察者接收事件前,默认最先调用复写 onSubscribe()
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }

            // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件作出响应" + value);
            }

            // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }
        };


    }
}

步骤3:通过订阅(Subscribe)连接观察者和被观察者

即 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调,具体实现比较简单,一句话搞定:

public class ExampleUnitTest {
    @Test
    public void test() {
        // 1. 创建被观察者 Observable 对象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // create() 是 RxJava 最基本的创造事件序列的方法
            // 此处传入了一个 OnSubscribe 对象参数
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

            // 2. 在复写的subscribe()里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                System.out.println("subscribe() 开始发送事件");
                // 通过 ObservableEmitter类对象产生事件并通知观察者
                // ObservableEmitter类介绍
                // a. 定义:事件发射器
                // b. 作用:定义需要发送的事件 & 向观察者发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });


        // 3. 创建观察者 (Observer )对象
        Observer<Integer> observer = new Observer<Integer>() {
            // 4. 创建对象时通过对应复写对应事件方法 从而 响应对应事件

            // 观察者接收事件前,默认最先调用复写 onSubscribe()
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }

            // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件作出响应" + value);
            }

            // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }
        };

        //4、通过订阅(Subscribe)连接观察者和被观察者
        observable.subscribe(observer);

    }
}

运行一下:

对于使用过RxJava的人来说这代码是非常之简单的,至于它内部的原理之后再来对其剖析。 

方式2:优雅的实现方法 - 基于事件流的链式调用

这种方式就是实际最常见的使用方式了,也就是会将上述分步的代码用一条链式调用的方式来使用,如下:

关于观察者的回调的调用时机这里说明一下:

对于被观察者的这个方法中:

根据这条的说明:

验证一下,看是不是onNext()则不会继续发送了呢?

RxJava2观察者&操作符纵览:

RxJava2被观察者纵览:

在上面的例子中我们是用的Observable来创建的被观察者,其实总共有5种创建方式,下面来瞅一下:

  • Observable,即被观察者,决定什么时候触发事件以及触发怎样的事件。

  • Flowable,可以看成是Observable的实现,只是它支持背压。
    咱们来使用一下:

    关于背压是啥在之后会专门来学习的,这个我记得当时在工作中还遇到过因为背压问题导致APP崩溃的情况,这里简单先了解一下,就是假如我们发送了N多个next()消息,如果消息处理不及时的话则就有可能产生消息堆积最终导致内存泄漏的问题,比如:

    我记得之前在工作中遇到崩溃的场景就是要在上游中定时每隔几秒要发送消息给下游,而下游处理时是需要连网请求服务器的,时间长了则就有可能崩溃,最终解决就是使用了背压策略解决的,这块在之后在细说。
  • Single,只有onSuccess可onError事件,只能用onSuccess发射一个数据或一个错误通知,之后再发射数据也不会做任何处理,直接忽略。
    这个比较简单,之前在分析RxJava时也用过:
  • Completable,只有onComplete和onError事件,不发射数据,没有map,flatMap操作符。常常结合andThen操作符使用。
    简单看一下,貌似用得较少:

    其中说到木有map操作符,确定一下:

    确实是木有。 

  • Maybe,没有onNext方法,同样需要onSuccess发射数据,且只能发射0或1个数据,多发也不再处理。

注意:五种被观察者可通过toObservable,toFlowable,toSingle,toCompletable,toMaybe相互转换,Single、Completable、Maybe它们其实是简化版的Observable。

RxJava2操作符纵览:

RxJava中最令人头疼的就是操作符了,啥是操作符?

以上这些全是属于操作符,所以下面先来纵览一下,总体分为五大类型:创建操作符、转换操作符、组合操作符、功能操作符、过滤操作符、条件操作符。下面来具体看一下,会全部列出来,但是呢只会挑常用的去使用一下:

创建操作符:

from操作符:

其实对于just操作符它里面的实现就是调有的fromArray,瞅一下:

转换操作符:

其中对于map是最常用的,其实对于了解过Java8的特性的就知道,转换操作符其实传递的是一些函数式接口,这里就不一一说明了,待实际工作中遇到了再来进行查阅。

组合操作符:

concat操作符:

这个在工作中有遇到过,所以下面简单使用一下:

功能操作符:

其中比较重要的是subscribeOn和observeOn():

过滤操作符:

filter:

条件操作符:

all:

 

以上是关于操作符的一个纵览,实在是太多了,里面有很多都平常不怎么用的,贴出来当个字典查阅,待未来实际工作中再根据实际需要再来进行学习。

原文地址:https://www.cnblogs.com/webor2006/p/12329139.html