Android主流框架——Rxjava (操作符与使用)

创建型操作符

package com.example.rxjavapractice.operators

import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.functions.Consumer

fun just() {
    // TODO 无需自己发射,创建即发射
    Observable.just("A", "B")
        .subscribe(object : Observer<String> {
            override fun onNext(t: String) {
                TODO("Not yet implemented")
            }

            override fun onSubscribe(d: Disposable) {
                TODO("Not yet implemented")
            }

            override fun onError(e: Throwable) {
                TODO("Not yet implemented")
            }

            override fun onComplete() {
                TODO("Not yet implemented")
            }
        })
}

fun empty() {
    // TODO 上游不发送任何事件,范型填Any,只会发射onComplete
    // TODO 进行耗时操作,不需要任何数据刷新UI,就可以用empty
    Observable.empty<Any>()
        .subscribe(object : Observer<Any> {
            override fun onSubscribe(d: Disposable) {
                TODO("Not yet implemented")
            }

            override fun onNext(t: Any) {
                TODO("Not yet implemented")
            }

            override fun onError(e: Throwable) {
                TODO("Not yet implemented")
            }

            override fun onComplete() {
                TODO("Not yet implemented")
            }

        })
}

fun range() {
    // TODO 从80开始,每次+1,发射8次,自动发射事件
    Observable.range(80, 8)
        .subscribe(object : Consumer<Int> {
            override fun accept(t: Int?) {
                TODO("Not yet implemented")
            }
        })
}

过滤型操作符

package com.example.rxjavapractice.operators

import io.reactivex.Observable
import io.reactivex.functions.Predicate
import java.util.concurrent.TimeUnit

fun filter() {
    // TODO 在上下游之间插入一个过滤器,返回为true即发射事件
    Observable.just(1, 2, 3)
        .filter(object : Predicate<Int> {
            override fun test(t: Int): Boolean {
                return t >= 2
            }
        })
}

fun take() {
    // TODO 限定事件源发射的事件次数
    Observable.interval(1, TimeUnit.SECONDS)
        .take(8)
}

fun distinct(){
    // TODO 过滤重复发射的事件
    Observable.just(1,1,2,
        2,3,3,4)
        .distinct()
}

fun elementAt(){
    // TODO 过滤制定事件的下标
    Observable.just(1,1,2,
        2,3,3,4)
        .elementAt(2)
}

合并型操作符

package com.example.rxjavapractice.operators

import io.reactivex.Observable
import io.reactivex.functions.Function3
import java.util.concurrent.TimeUnit

fun startWith() {
    // TODO 倒叙发射事件源
    Observable.just(1, 2, 3) // 3
        .startWith(Observable.just(100, 200, 300)) // 2
        .startWith(Observable.just(4, 5, 6)) // 1
}

fun concatWith() {
    // TODO 顺序发射事件源
    Observable.just(1, 2, 3) // 1
        .concatWith(Observable.just(100, 200, 300)) // 2
        .concatWith(Observable.just(4, 5, 6)) // 3
}

fun concat() {
    // TODO concat可以把最多4个事件源进行连接,顺序进行,实质上是把四个数据源合成了一个,串行发射数据
    Observable.concat(
        Observable.just(1),
        Observable.just(2),
        Observable.just(3),
        Observable.just(4, 5, 6, 7)
    )
}

fun merge() {
    // TODO start:开始累计;count:累计数量;initialDelay:初始时延;period:事件间隔时间
    Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)

    // TODO merge将最多四个数据源合并成一个数据源,各个子数据源并行发射数据
    Observable.merge(
        Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS),
        Observable.intervalRange(6, 5, 1, 1, TimeUnit.SECONDS),
        Observable.intervalRange(11, 5, 1, 1, TimeUnit.SECONDS),
        Observable.intervalRange(16, 5, 1, 1, TimeUnit.SECONDS)
    )
}

fun zip() {
    val observable1 = Observable.just(1, 2, 3)
    val observable2 = Observable.just("1", "2", "3")
    val observable3 = Observable.just(1.0, 2.0, 3.0)

    // TODO,把几个数据源的数据合并成一个数据源,且发送的事件也进行合并
    Observable.zip(
        observable1,
        observable2,
        observable3,
        object : Function3<Int, String, Double, String> {
            override fun apply(t1: Int, t2: String, t3: Double): String {
                return ""
            }
        })
}

转换型操作符

package com.example.rxjavapractice.operators

import android.util.Log
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.ObservableOnSubscribe
import io.reactivex.ObservableSource
import io.reactivex.functions.Consumer
import io.reactivex.functions.Function
import io.reactivex.observables.GroupedObservable
import java.util.concurrent.TimeUnit

const val TAG = "TranslateOperators"

fun map() {
    // TODO 上游Int -> map(Int, String) -> 下游String
    Observable.just(1)
        .map(object : Function<Int, String> {
            override fun apply(t: Int): String {
                return t.toString()
            }
        })
        .map(object : Function<String, Int> {
            override fun apply(t: String): Int {
                return t.toInt()
            }
        })
}

fun flatMap1() {
    // TODO 将每次事件变成一个事件源,事件源可再次向下游发送事件
    Observable.just(1, 2, 3)
        .flatMap(object : Function<Int, ObservableSource<String>> {

            override fun apply(t: Int): ObservableSource<String> {
                // TODO 返回一个事件源,ObservableSource是Observable的父类,可再次发射事件
                return Observable.create(object : ObservableOnSubscribe<String> {
                    override fun subscribe(emitter: ObservableEmitter<String>) {
                        emitter.onNext("flatMap: $t")
                        emitter.onNext("flatMap: $t")
                        emitter.onNext("flatMap: $t")
                    }
                })
            }

        })
}

fun flatMap2() {
    // TODO flatMap是不排序的
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: ObservableEmitter<Int>) {
            emitter.onNext(3)
            emitter.onNext(2)
            emitter.onNext(1)
        }
    }).flatMap {
        val list = mutableListOf<String>("11", "22", "33")
        Observable.fromIterable(list)
            .delay(it.toLong(), TimeUnit.SECONDS)
    }
}

fun concatMap() {
    // TODO concatMap是排序的
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: ObservableEmitter<Int>) {
            emitter.onNext(3)
            emitter.onNext(2)
            emitter.onNext(1)
        }
    }).concatMap {
        val list = mutableListOf<String>("11", "22", "33")
        Observable.fromIterable(list)
            .delay(it.toLong(), TimeUnit.SECONDS)
    }
}

fun groupBy() {
    // TODO 可以对一级事件源进行分组,每组都是二级事件源
    Observable.just(1, 2, 3, 4, 5)
        .groupBy(object : Function<Int, String> {
            override fun apply(t: Int): String {
                return if (t > 3) "高分组" else "低分组"
            }
        })
        .subscribe(object : Consumer<GroupedObservable<String, Int>> {
            // TODO 这一层是分组,t作为事件源,可以向下游发射组内元素
            override fun accept(t: GroupedObservable<String, Int>?) {
                Log.d(TAG, "groupBy分组名: ${t?.key}")
                // TODO 这一层才是组内元素
                t?.subscribe {
                    Log.d(TAG, "组内元素:${it}")
                }
            }
        })
}
原文地址:https://www.cnblogs.com/zsben991126/p/14353770.html