RxJava2|Observable

RxJava2 Observable

前述

java-1.8

maven-3

rxjava-2.2.3

我也不知道称呼为基类好不好...

官方介绍.

0...N flows, 但不支持背压.

示例(Observable的简单使用)

RxJava是基于观察者模式的. Observable是一个被观察者(它的观察者是Observer).

Observable的操作实现类中会生成6份信号(由while实现)并发射, 可它的观察者Observer则只接收前4份信号, 并逐一打印(也就是处理).

Observable的操作实现类 - HelloObservable.java

package yag;

import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * @author Senyag
 */
public class HelloObservable {

    public void helloObservable(){

        // 代码这样排个人觉得会直观一些.

        // 初始化Observable
        Observable
                // create()操作符: 通过以编程方式调用observer方法从头开始创建一个Observable
                .create((ObservableOnSubscribe<Integer>) observableEmitter -> {
                    //observableEmitter: 发射器
                    Integer i = 0;
                    while ( i < 7){
                        i++;
                        observableEmitter.onNext(i);
                    }
                })
            
           
                // subscribe()操作符: 根据Observable的发射和通知进行操作
                .subscribe(new Observer<Integer>() {  // Observer 就是观察者

                    private Disposable mDisposable;

                    @Override
                    public void onSubscribe(Disposable disposable) {
                        mDisposable = disposable;
                    }

                    @Override
                    public void onNext(Integer i) {
                        if (i == 5){
                            // mDisposable可以切断操作, 让Observer不再接收信息.
                            mDisposable.dispose();
                        }else {
                            System.out.println("现在接收到的信号是: 第" + i + "信号");
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
}

执行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloObservable helloObservable = new HelloObservable();
        helloObservable.helloObservable();
    }
}

执行结果

现在接收到的信号是: 第1信号
现在接收到的信号是: 第2信号
现在接收到的信号是: 第3信号
现在接收到的信号是: 第4信号

Process finished with exit code 0

小结

用到了两个操作符: create()(创建发射器)和subscribe()(处理所发射的请求). 官方中针对这些操作符给出了特定的一页来介绍它们: 传送门

原文地址:https://www.cnblogs.com/shwo/p/9872611.html