[EventBus源码解析] 订阅者处理消息的四种ThreadMode

前言

  在前面,我们探讨了如何在自己的代码中引入EventBus,进行基本的事件分发/监听;对注册观察者与事件发送的过程进行了浅析。从之前的学习中,我们了解到,EventBus一共有4种onEvent方法,要根据实际需求的不同选用不同的事件处理方法。本篇blog中,我们集中研究一下这四种事件处理方法内部分别做了什么事情,是如何实现的。

  本篇会较多涉及java中的并发/多线程技术,这部分基础不够扎实的朋友,可以趁机黑练一下。(其实是说我自己 -_-!)

post的最后一步

  EventBus.post 是分发事件时调用的方法,post时,根据EventType找到对应的Subscription列表,遍历列表依次调用postToSubscription。方法如下

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case PostThread:
                invokeSubscriber(subscription, event);
                break;
            case MainThread:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BackgroundThread:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case Async:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

  根据EventBus提供的4种消息处理方式(ThreadMode)进行switch/case,这4种ThreadMode的含义,在之前的文章中已经给出过说明。结合代码,能看出在这四个case语句里面,处理方式无非以下两种:

  1. 若当前线程是ThreadMode所指明的线程,直接调用 invokeSubscriber(subscription, event);
  2. 当前线程不是ThreadMode所指明的线程,将 subscription、event 加入到目标线程的队列中(enqueue)。

  接下来逐一分析这两种处理方式。

交由当前线程处理

  从方法名与方法所起的作用来看,invokeSubscriber一定是用到了反射机制,代码中是如何做的呢?

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

  如此简单,subscription中保存了之前解析出的以“onEvent”开头的方法,直接调用之。当然要catch住invoke可能会抛出的三个Exception:IllegalArgumentException, InvocationTargetException, IllegalAccessException,不知为何这里只catch了其中两个。我判断原因是,之前解析subscription时,已经就参数类型做过过滤了,可以保证传入的event一定是方法所需要的,所以就无需再去catch参数类型不匹配的Exception。

交由非当前线程处理

  在EventBus中,声明了如下三个Poster

    private final HandlerPoster mainThreadPoster;
    private final BackgroundPoster backgroundPoster;
    private final AsyncPoster asyncPoster;

  这三个Poster里,都有enqueue方法,用来将事件压入队列。但是这三种处理方式是略有不同的,依次来看。

HandlerPoster

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

  HandlerPoster,人如其名,继承了Handler类,内部维护了一个PendingPostQueue,每当有时间enqueue时,判断当前Poster是否处于激活状态。对于未激活的将其激活。激活中的Poster,会通过sendMessage(obtainMessage())来依次处理PendingPostQueue中的事件,这是通过重载Handler中的handleMessage来做的。

BackgroundPoster

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

  与HandlerPoster不同的是,BackgroundPoster并没有继承自Handler,而是实现了Runnable接口。这么做的原因是,在用到BackgroudPoster的场合,必须是新建一个进程来处理事件,这里就使用了eventBus.getExecutorService().execute(this),从线程池里取出线程来执行。可以看到后面的AsyncPoster也是采用类似的方法。

AsyncPoster

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

  同样是用Runnable来实现,为啥AsyncPoster的代码比BackgroundPoster的简单这么多呢?仔细对比一下,就会发现BackgroudPoster中多出的代码,都是用来处理同步的。所以,原因在于,BackgroudPoster必须保证事件处理的顺序,先进先出。而AsyncQueue则没有这个顾虑。所以在需要按顺序处理事件的场合,就不要使用AsyncPoster啦!

小结

  本篇blog简单介绍了四种ThreadMode内部的机制,如果需要执行事件的线程是当前线程,则直接用反射调用方法;否则将事件压入对应Poster的队列中,依次(HandlerPoster,BackgroundPoster)或异步(AsyncPoster)执行。

下期预告

  从目录结构上,对EventBus项目进行整体解析。

原文地址:https://www.cnblogs.com/maozhige/p/4755920.html