RxJava总结(原)

1、RxJava的作用  

    RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

    It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away   concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

    说白了就是大家都说的让你能一眼看出来你之前做什么,之后做什么。它不是简化了代码,而是简化了逻辑。它具有线程安全性。

2、RxJava的原理

   RxJava是基于观察者模式的。用户定义Observer之后,提供一个Observerble,由Observerble进行subscribe,就安心等待结果就行。

   RxJava可以进行事件的排序操作,使用了decorate设计模式和拦截器模式进行操作。在操作的过程中将过程大量的封装为OnSubscribe对象,配合Scheduler完成包装。

  

使用下面一行代码开启循环调用,直至完成事件。

    

Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
    // new Subscriber created and being subscribed with so 'onStart' it
    st.onStart();
    onSubscribe.call(st);
} catch (Throwable e) {
    ...
}

 

3、RxJava的实现

  

4、RxJava的流程

  

5、RxJava的使用

   5.1 简单的使用

    5.1.1 创建观察者

    5.1.2 创建被观察者

    5.1.3 调用subscribe方法进行订阅。

private void subscribe1() {
        //创建一个观察者
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "Error");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }
        };
        //使用Observable.create()创建被观察者
        Observable observable1 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("Wrold");
                subscriber.onCompleted();
            }
        });
        //订阅
        observable1.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.io()).doOnNext(new Observable.OnSubscribe() {
            @Override
            public void call(Object o) {
                System.out.println(o.toString());
            }
        }).subscribe(observer);
    }

  5.2 Retrofit和RxJava的配合:Retrofit和RxJava配合使用时,Retrofit的CallAdapter使用RxJavaCallAdapterFactory生成返回值为Observable<T>的对象。使用该对象进行subscribe操作即可。    

    5.2.1 定义返回数据      

public class UserInfo {
    public String username;
    public String password;
}

    5.2.2 定义接口,返回值为Observable<T>,即为被观察者对象

interface MyService {
    @GET("user/login")
    Observable<UserInfo> login(
            @Query("username") String username,
            @Query("password") String password
    );
}

    5.2.3 生成Retrofit对象,配置CallAdapterFactory为RxJavaCallAdapterFactory。

Retrofit retrofit = new Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())//新的配置
                .baseUrl(BASE_URL)
                .build();

    5.2.4 生成代理类,即被观察者

MyService service = retrofit.create(MyService.class);

    5.2.5 配置事件链和Scheduler,订阅;

String phone = "123";
        String password = "123";
        service.login(phone, password)               //获取Observable对象
                .subscribeOn(Schedulers.newThread())//请求在新的线程中执行
                .observeOn(Schedulers.io())         //请求完成后在io线程中执行
                .doOnNext(new Action1<UserInfo>() {
                    @Override
                    public void call(UserInfo userInfo) {
                        saveUserInfo(userInfo);//保存用户信息到本地
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//最后在主线程中执行
                .subscribe(new Subscriber<UserInfo>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        //请求失败
                    }

                    @Override
                    public void onNext(UserInfo userInfo) {
                        //请求成功
                    }
                });
原文地址:https://www.cnblogs.com/weizhxa/p/9769985.html