java框架---->RxJava的使用(一)

  RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。

RxJava的简单使用

一、mavan的pom.xml中增加rxjava的依赖

这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.0</version>
</dependency>

二、测试的用类

import rx.Observable;
import rx.Subscriber;
/**
 * @author huhx
 */
public class RxJavaTest {
    public static void main(String[] args) {
        Observable<String> observable = Observable.just("One", "Two", "Three");
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable item) {
                System.out.println("onError");
            }

            @Override
            public void onNext(String item) {
                System.out.println("Item is " + item);
            }
        });
    }
}

三、打印的结果如下:

Item is One
Item is Two
Item is Three
onCompleted

RxJava源码的简单分析

根据上述的代码,我们简单分析一下程序的流程。

一、首先Observable.just("One", "Two", "Three")代码:

just是一个工厂方法,用心构建一个Observable对象。

public static <T> Observable<T> just(T t1, T t2, T t3) {
    return from((T[])new Object[] { t1, t2, t3 });
}

from方法的代码如下:

public static <T> Observable<T> from(T[] array) {
    int n = array.length;
    if (n == 0) {
        return empty();
    } else
    if (n == 1) {
        return just(array[0]);
    }
    return unsafeCreate(new OnSubscribeFromArray<T>(array));
}

二、我们重点是看unsafeCreate方法:

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。

static {
    init(); // 初始化了很多RxJavaHooks跟事件有着的变量
}

onCreate方法中,这个没有怎么看懂后续再会。

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

三、对于observable.subscribe()代码

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

Observable的subscribe方法代码如下:

 1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 2     if (subscriber == null) {
 3         throw new IllegalArgumentException("subscriber can not be null");
 4     }
 5     if (observable.onSubscribe == null) {
 6         throw new IllegalStateException("onSubscribe function can not be null.");
 7     }
 8 
 9     subscriber.onStart();
10 
11     if (!(subscriber instanceof SafeSubscriber)) {
12         subscriber = new SafeSubscriber<T>(subscriber);
13     }
14 
15     try {
16         RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
17         return RxJavaHooks.onObservableReturn(subscriber);
18     } catch (Throwable e) {
19         Exceptions.throwIfFatal(e);
20         if (subscriber.isUnsubscribed()) {
21             RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
22         } else {
23             try {
24                 subscriber.onError(RxJavaHooks.onObservableError(e));
25             } catch (Throwable e2) {
26                 Exceptions.throwIfFatal(e2);
27                 RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
28                 RxJavaHooks.onObservableError(r);
29                 throw r; // NOPMD
30             }
31         }
32         return Subscriptions.unsubscribed();
33     }
34 }

四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法

对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。

public void call(Subscriber<? super T> child) {
    child.setProducer(new FromArrayProducer<T>(child, array));
}

setProducer方法执行的代码如下:

 1 public void setProducer(Producer p) {
 2     long toRequest;
 3     boolean passToSubscriber = false;
 4     synchronized (this) {
 5         toRequest = requested;
 6         producer = p;
 7         if (subscriber != null) {
 8             // middle operator ... we pass through unless a request has been made
 9             if (toRequest == NOT_SET) {
10                 // we pass through to the next producer as nothing has been requested
11                 passToSubscriber = true;
12             }
13         }
14     }
15     // do after releasing lock
16     if (passToSubscriber) {
17         subscriber.setProducer(producer);
18     } else {
19         // we execute the request with whatever has been requested (or Long.MAX_VALUE)
20         if (toRequest == NOT_SET) {
21             producer.request(Long.MAX_VALUE);
22         } else {
23             producer.request(toRequest);
24         }
25     }
26 }

这里会走到21行的代码,跟进去FromArrayProducer的request方法:

public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException("n >= 0 required but it was " + n);
    }
    if (n == Long.MAX_VALUE) {
        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
            fastPath();
        }
    } else
    if (n != 0) {
        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
            slowPath(n);
        }
    }
}

对于fastPath()的代码如下:

void fastPath() {
    final Subscriber<? super T> child = this.child;

    for (T t : array) {
        if (child.isUnsubscribed()) {
            return;
        }
        child.onNext(t);
    }
    if (child.isUnsubscribed()) {
        return;
    }
    child.onCompleted();
}

执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。

友情链接

原文地址:https://www.cnblogs.com/huhx/p/baseusejavarxjava1.html