RxJava 以及 Android 中的通用线程解决方案、并发与线程安全

关于RxJava如今是熟到发紫了,所以对于它底层的动作机制的了解是迫在眉睫了,费话不多说,直接开始。

这里还是以之前获取个人github仓库列表为例,用retrofit+rxjava,也是实际项目中用得最多的,先来回顾一下当时【https://www.cnblogs.com/webor2006/p/10502230.html】在研究retrofit时所定义的API:

这里新建一个工程,深入之前先来学会RxJava的基本使用,先来声明API,首先得增加retrofit的支持,如下:

然后API定义如下:

具体使用:

运行:

以上不多说,只是回顾一下retrofit的用法,接下来改用RxJava的方式,先增加相关的依赖:

,首先API返回的由Call就得变为Single了,如下:

而在我的公司项目中貌似返回的Observable这个对像,如下:

关于这俩的区别在之后再说,先默认使用Single,修改了API之后,接下来使用一下:

此时编译运行一下:

报错了,貌似是无法从Call转换成Single对像,因为API如今是返回Single了:

所以,此时需要增加一个CallAdapter的配置,用来支持进行转换,如下:

然后再运行:

失败了。。这是为啥呢?我们知道其实RxJava是需要手动来切换线程的,所以先增加它,先不管其内部实现:

成功啦,目前由于在回调中木有操作View,如果在回调中操作View会怎么样呢,试试:

再编译运行:

很显然是由于是非UI线程更新View异常了,所以此时对于回调所在线程是在非UI线程里的,所以需要将其切换到主线程里,这里需要增加另外一个库了:

编译运行:

其中还有一个回调方法:

请求前的回调,所以可以在请求前来在界面中增加一个请求提示,如下:

其中这个回调有一个Disposable参数,它的意义简单说可以理解成取消订阅,具体可以这样用它:

好,对于利用Rxjava来实现网络请求的简单使用就到这,接下来则重点是来理解这个框架,当然不是先来直接分析这个代码,而是将其拆解,从最基础的使用上来理解,如下:

Single和Observable:

运行一下:

顺间就显示了,因为是本地的数据,对于纯小白来说看到这样的写法还是挺难理解的,其订阅关系是:

接下来就彻底来挼清这个简单代码的运作机理,先来看下它:

很明显是来对参数做判空,简单瞅下它的实现,很简单:

继续:

这句涉及到两个陌生的东东:SingleJust、RxJavaPlugins.onAssembly(),下面先来看下RxJavaPlugins.onAssembly()的细节:

看到泛型就晕。。也只能硬着头皮细细揣摩:

其中Function是传一个类型的类,返回另一个类型,熟悉Java8的亲们应该比较熟悉:

如果onSingleAssembly不为空则就执行转换了,而它是用户可以配置的,其实也就是是否要对Single对象进行进一步加工,默认肯定是不需要,所以,对于这句话可以简单理解:

所以此just()方法重点看到它就成了,所以点进去看一下SingleJust,如下:

那再看一下Single,发现是一个3000多行超级大的抽象类:

所以里面的细节就不必现跟了,总之记着SingleJust里面重写了subscribeActual()方法:

这个方法会在调用它时最终被调用的:

所以点击进去看是不是这样:

很显然最终会调用SingleJust中的subscribeActual()方法嘛,所以要理解这个回调是如何调用的:

就需要看SingleJust的subscribeActual()方法的具体实现细节啦,如下:

秒懂不,直接就调用了我们传的回调对象嘛,以上简单的示例虽说简单,但是完整的阐述了RxJava的一个订阅实现细节,也就是其比较核心的原理,很重要,再简单挼下:

此时会在里面生成一个SingleJust对像:

当调用它时,实际会调用SingleJust.subscribeActual()方法,而subscribeActual()方法最终又会回调我们的这个对象的回调方法,如下:

至此,整个这个示例原理就彻底理清楚了,接下来继续深入RxJava,此时就涉及到操作符相关的东东了,关于什么是操作符,对于熟悉Java8的亲们也是不会陌生滴,就不过多解释啦,这里以map转换符为例,如下:

此时运行效果一样,这里的map做了啥事件,就已经牵扯到RxJava的结构啦,下面用图来剖析下:

而在我们调用Single.subscribe()方法时,其调用其实是这样子滴:

好,那此时如果再加上一个map的转换操作符后,具体又是如何运转的呢?

具体是如何转换的呢?此时从图中切回到代码:

等于创建了一个新的SingleMap,也就是表象上看着还是Single对象,其实内部已经发生对象的转换了,当然需要将当前的Single对像给传进来:

而此时又回到图画一下当map()之后的形态变化:

接下来就瞅一下SingleMap干了啥?

其中source则是原Single对像,调用了它的subscribe方法,但是!!此时的SingleObserver对像变化了,变为了MapSingleObserver啦,此时的图片形态就变成了:

接下来则具体看下它里面是如何工作的:

接着执行onSuccess()方法:

当然还可以再多增加其它的操作符,总的来说源和目标肯定只有一个,中间的都是形成了一个链,发送会往源传,得到结果之后则从源往目标传,理解这个理念非常之重要。

说了Single,也谈一下Observable,Rxjava1时还木有Single,其实Single就是Observable的简化版本,它的回调就只观注开始和结束,而对于Observable而言,它的中间过程也会回调,如下:

其中的onNext()可能会被调用多次,了解一下区别既可。

Disposable:

在之前咱们已经稍微提到过它,其实是可以做取消订阅用的,下面用一个例子来实践一下,这里采用Observable来进行每秒间隔发事件,如下:

运行看一下效果:

此时增加一个按钮,在运行期间将其停止掉,这时就可以利用Disposable了,如下:

运行:

接下来很想知道Observable.subscribe()方法到底做了啥导致在onSubscribe()方法中得到了一个什么样的Disposable,才能知道Disposable是怎么工作的,不同的Observable它的Disposable是不一样的,所以咱们先来看一下目前这个Observable是一个什么样的Observable,看它:

点开看一下interval():

下面来细看一下:

此时就得查看一下IntervalObserver了:

也就是说:

调用的也就是IntervalObserver.dispose()方法,所以得研究一下它的具体实现:

然后看一下具体实现:

而对于IntervalObserver中的run()方法就会根据这个状态来做判断,如下:

没有中间操作符的Disposable工作方式都是这样的,没有一个传递的过程,接下来咱们来看一下有map操作的Disposable又是咋样的:

然后点击查看源码:

也就是最终:

调用的是MapObserver.dispose()方法,但是!!貌似在MapObserver中木有找到dispose()方法呀:

这就需要往父类进行查找喽,果真有,如下:

其中的s是?

那不就是源,也就是最终会调用源的dispose()方法,如下:

总结Disposable也就是两种场景:

1、原始的Disposable则是停掉自己的任务。

2、如果带有操作符的,则先停掉自己,然后再停掉原始的Disposable。

线程切换Scheduler:

关于线程切换有几处,咱们一个个来剖析,这里还是回到Single的程序来,首先来瞅下它:

点击进行:

其中核心是:

也就是我们调用subscribeOn所传的参数:

点进去瞅下:

具体它的方法先不分析,可以看到参数是需要一个Runnable,而由于SubscribeOnObserver实现了Runnable方法,所以当然可以将它传进去喽:

而它里面的run方法的实现:

也就是说:

此时我们的调用就会在指定的线程来做了,如下:

用图来说明一下这个subscribeOn()的过程:

那如果写多个subscribeOn()呢?

其实最终会用第一行的切线程,图例表示其形态:

而对于subscribeOn()之后的Disposable则为:

也就是subscribeOn()指定的线程是用来决定subscribe()的订阅操作的,那接下来看一下observeOn(),如下:

此时就看一下它的回调中瞅下是否有切线程的东东:

那如果有多个observeOn会有啥结果呢?由于是管下面的,可能会是这样:

那么就有东东可以利用啦,怎么利用?假如再插入一个操作符:

也就是使用observeOn()多次切是会生效的,而不像subscribeOn()使用多次是没啥效果的,那有了observeOn()的灵活性,subscribeOn()是不是可以不用了?当然不行,因为subscribeOn()是管订阅的,需要配合着使用。总的来说:subscribeOn()是先切线程再进行订阅,而observeOn()是先订阅,而每次回调会切线程处理

接下来再研究一下它们的原理:

首先来瞅一下Schedulers.io(),不过有一个跟它类似的api需要先看它:

先说一下它们俩的区别:

Schedulers.newThread():每次都会新开一个线程。

Schedulers.io():里面用到了线程池,不是每次都新开线程。

好,先来了解下Schedulers.newThread()是做了啥?

直接看NEW_THREAD:

 而切换线程在上面的分析中主要是这句代码:

所以咱们来瞅一下它里面有木有scheduleDirect()这个方法,发现木有。。那就肯定是在它的父类找呗,找找:

确实是有:

下面来简单分析一下:

然后核心代码:

点进去瞅一下:

抽象方法,肯定得看子类:

一层套一层,继续:

其实可以看出Scheduler就是包装Worker的东东,而Worker是包装了executor的东东,咱们再回过头来看:

就会调到子类:

再回过头来瞅一下DisposeTask:

好,了解了Schedulers.newThread()机制之后, 就可以来理解Schedulers.io()了,其实会有一个IoScheduler对像,如下:

然后它里面也有一个createWork()方法,瞅一下:

就不往里跟了,很明显确实io()就是带有缓存的,不是每次都new一个线程。

好接下来就看最后一个主线程的scheduler啦:

点击进来:

然后可以大致看一下切换细节:

至此!!关于Rxjava的核心机制原理都已经剖析完毕,还是挺麻烦!!

原文地址:https://www.cnblogs.com/webor2006/p/10545699.html