RxJava2|Single, Maybe 和 Completable

RxJava2 Single, Maybe 和 Completable

原文: https://www.jianshu.com/p/66a55abbadef

参考而做的二次实现. 并重排版一下他的总结, 在此做个记录好回顾. 本文没有原文那么详尽, 建议阅读原文.

前述

java-1.8

maven-3

rxjava-2.2.3

ObservaleFlowable都是用来发射数据流的(0..N), 然而出现了三个只有1个数据的基类: (https://www.jianshu.com/p/66a55abbadef)

  • Single - 只发射一条单一的数据,或者一条异常通知, 不能发射完成通知,其中数据与通知只能发射一个。
  • Compoletable - 只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个
  • Maybe - 可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。

示例(Single简单使用)

Single操作实现类 - HelloSingle.java

package yag;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloSingle {

    public void helloSingle(){
        Single
                .create((SingleOnSubscribe<Integer>) singleEmitter -> {
                    // 发射
                    singleEmitter.onSuccess(1);
                    singleEmitter.onSuccess(2);
                })
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
    }
}

执行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloSingle helloSingle = new HelloSingle();
        helloSingle.helloSingle();
    }
}

执行结果

1

Process finished with exit code 0

只接收第一条信息.

小结

发送异常信息 - 使用onError()方法.

发射器接口SingleEmitter:

1、方法void onSuccess(T t)用来发射一条单一的数据,且一次订阅只能调用一次,不同于Observale的发射器ObservableEmitter中的void onNext(@NonNull T value)方法,在一次订阅中,可以多次调用多次发射。
2、方法void onError(Throwable t)等同于ObservableEmitter中的void onError(@NonNull Throwable error)用来发射一条错误通知
3、SingleEmitter中没有用来发射完成通知的void onComplete()方法。
方法onSuccessonError只可调用一个,若先调用onError则会导致onSuccess无效,若先调用onSuccess,则会抛出io.reactivex.exceptions.UndeliverableException异常。

观察者SingleObserver:

方法void onSubscribe(Disposable d)等同于Observer中的void onSubscribe(Disposable d)
方法void onSuccess(T t)类似于Observer中的onNext(T t)用来接收Single发的数据。
方法void onError(Throwable e)等同于Observer中的void onError(Throwable e)用来处理异常通知。
没有用来处理完成通知的方法void onComplete()

示例(Completable简单使用)

Completable操作实现类 - HelloCompletable.java

package yag;

import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloCompletable {

    public void helloCompletable(){
        Completable
                .create(new CompletableOnSubscribe() {
                    @Override
                    public void subscribe(CompletableEmitter completableEmitter) throws Exception {
                        completableEmitter.onComplete();
                    }
                })
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("执行完成");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
    }
}

执行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloCompletable helloCompletable = new HelloCompletable();
        helloCompletable.helloCompletable();
    }
}

执行结果

执行完成

Process finished with exit code 0

补充

onError() (发射异常信息), 由CompletableObserveronError().

示例( Maybe简单使用)

Maybe操作实现类 - HelloMaybe.java

package yag;

import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.disposables.Disposable;

public class HelloMaybe {
    
    public void helloMaybe(){
        Maybe
                .create(new MaybeOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(MaybeEmitter<Integer> maybeEmitter) {
                        maybeEmitter.onSuccess(1);
                        maybeEmitter.onComplete();
                    }
                })
                .subscribe(new MaybeObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("执行完成");
                    }
                });
    }
}

执行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){

        HelloMaybe helloMaybe = new HelloMaybe();
        helloMaybe.helloMaybe();
    }
}

执行结果

1

Process finished with exit code 0
原文地址:https://www.cnblogs.com/shwo/p/9881826.html