EventBus源代码解析:2、消息的发布与处理

在上一篇文章《EventBus源代码解析:1、初始化与订阅者注册》当中,我们主要分析了两个事情:

  • EventBus初始化
  • 订阅者的注册

我们通过分析,EventBus在初始化的时候,初始化了几个集合,分别用来根据EventType和Event Handler所在的类索引对应的Handler方法;并且也同时初始化了用于不同ThreadMode的Poster。订阅者在注册的时候,EventBus会解析要注册的类,分析其所有的方法,从中找出Event的Handler方法(即public修饰的以onEvent开头),然后根据EventType保存到相应的List中。 但我们一直没有分析到一个分析,那就是,EventBus到底是如何去Post消息的呢?接下来我们就去分析这个问题。按照我们的老套路,还是从最常用的代码入手:

1
2
MessageEvent event = new MessageEvent(System.currentTimeMillis(), "Message Sequence " + mSequence.getAndIncrement());
EventBus.getDefault().post(event);

这个代码主要分为两步,第一步是构建了一个需要处理的Event即MessageEvent,根据我们之前的分析,在我们调用register()方法的时候,EventBus会解析并将MessageBus作为key保存在一个HashMap中。通过调用EventBus.getDefault().post(event)方法,EventBus会自动调用我们的onEvent方法,我们这里的实现如下:

1
2
3
4
5
6
7
8
9
10
// Called in Android UI's main thread
public void onEventMainThread(MessageEvent event) {
mMessages.add("onEventMainThread Receive : " + event);
mAdapter.notifyDataSetChanged();
}

public void onEvent(MessageEvent event) {
Log.i(TAG, "Thread name : " + Thread.currentThread().getName());
mHandler.obtainMessage(MESSAGE\_WHAT\_MESSAGEEVENT, event).sendToTarget();
}

那这一个过程当中,EventBus都做了哪些事情呢?我们先来看看post()方法的源代码:

post(Object event)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void post(Object event) {

// 获取当前线程(调用post方法的线程)中的一个PostingThreadState实例
PostingThreadState postingState = currentPostingThreadState.get();
// 获取当前线程(调用post方法的线程)中的EventQueue
List<Object> eventQueue = postingState.eventQueue;
// 将Event添加到队列当中
eventQueue.add(event);

// 如果当前线程(调用post方法的线程)没有在发布Event
if (!postingState.isPosting) {
// 判断调用者是否工作在主线程上
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
// 设置标志,正在发布Event
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 依次发送,直至eventQueue为空为止
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

我们依次来分析每行代码的作用:

  1. PostingThreadState postingState = currentPostingThreadState.get();

    1. 这一行代码中,有一个PostingThreadState的定义,我们首先要搞明白PostingThreadState到底是怎么回事?

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      /\*\* For ThreadLocal, much faster to set (and get multiple values). */
      final static class PostingThreadState {
      // eventQueue
      final List<Object> eventQueue = new ArrayList<Object>();
      // 标志:正在发布?
      boolean isPosting;
      // 标志:是主线程?
      boolean isMainThread;
      // 订阅者
      Subscription subscription;
      // Event
      Object event;
      // 已经取消
      boolean canceled;
      }
我们可以看到,原来这是一个静态不可变类,按照作者的描述,该类的作用用来提高性能,用于ThreadLocal,可以更快的去设置获取读取多个值。等下ThreadLocal在哪里呢?我们继续看,

1
2
3
4
5
6
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
这是currentPostingThreadState的定义,我们可以看到,currentPostingThreadState定义为一个ThreadLocal对象,其内容是PostingThreadState对象,可以看到其中的initialValue()方法返回了一个新的PostingThreadState()对象,这是什么意思呢?换句话每当从一个线程调用currentPostingThreadState.get()方法的时候,系统会检查当前线程是否有一份PostingThreadState实例,如果没有则新建一个,再换句话说,每一个线程中都有其独一无二的一个PostingThreadState实例。那这个用来做什么呢?我们继续分析。
  1. List eventQueue = postingState.eventQueue;

    1. 这一行代码就比较有意思,post方法从postingState中获取了一个eventQueue,我们再回到刚刚去看一下PostingThreadState中关于eventQueue的定义

      1
      final List<Object> eventQueue = new ArrayList<Object>();

      可以看到,每一个新的PostingThreadState对象中都有自己的一个eventQueue对象,并单独指向一个ArrayList,什么意思呢?我们可以明白,每个线程中都有自己的一份PostingThreadState拷贝,那么换言之,每一个线程中,同样有这样一个自己专属的eventQueue。也就是说代码List eventQueue = postingState.eventQueue;其实是获取了当前线程中的对应的eventQueue。(注意哦,EventBus中所有线程中消息的发送都是可以通过这个post方法实现的)

  2. eventQueue.add(event);
    1. 这段代码就相当简单了,将event添加到自己所在线程的eventQueue当中。
  3. 继续往下看,发现是一个判断

    1
    if (!postingState.isPosting)

    那么我们就根据分支来分析:

    1. ture:也就是说postingState.isPosting=false;也就是说,当前的线程没有在发布event,则进入以下的流程:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
      postingState.isPosting = true;
      if (postingState.canceled) {
      throw new EventBusException("Internal error. Abort state was not reset");
      }
      try {
      while (!eventQueue.isEmpty()) {
      postSingleEvent(eventQueue.remove(0), postingState);
      }
      } finally {
      postingState.isPosting = false;
      postingState.isMainThread = false;
      }
    我们来分析下,这里面都做了什么的工作呢?
    1.  首先会先判断当前的线程是否是在UI线程上,为什么要判断呢?什么这个还要问吗?UI线程不能做太多事情当然要小心处理啊!!!所以通过代码
        
1
Looper.getMainLooper() == Looper.myLooper()
来了解。 2. 然后呢?判断当前的线程是否已经canceled,也就是当前的线程是否已经unregistered了,这个容易理解,不去细细分析。 3. 然后进入一个while循环
1
2
3
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
这个代码也很明了啊,就是不断的通过postSingleEvent发送Event直至队列尾空为止。 2. false:哦,这里应该写另外分支要做的事情,额,如果当前的线程正在发送,那么就不去做任何事情了,等待上一次的while循环处理就好。

所以,通过上面的分析,我们发现最终代码还是进入

1
postSingleEvent(eventQueue.remove(0), postingState);

对所有的消息进行发送处理,那我们继续分析一下这一个方法就好了。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error

首先看源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 发送单个的Event
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 获取event的类
Class<?> eventClass = event.getClass();
// 标记,用来表示是否已经找到event对应的订阅者
boolean subscriptionFound = false;
// 判断event是否开启继承?
if (eventInheritance) {
// 查找Event对应的所有的event类型(包括父类和接口)。
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
// 获取其中的一个Event类型(Event对应的类或者其父类或者其实现的接口的一种)
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 如果没有开启Event继承,则直接根据Event的类型,在指定的线程中发送Event即可。
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

其实我们大体上一看就知道,这段代码也并没有执行具体的消息发送,但做了很必要的预处理工作,那都有哪些工作呢?我已经在程序里面加了很多注释了,我们可以很容易的发现,其实关键的代码在里面的那个if..else…分支语句里面,我们来依次来看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
if (eventInheritance) {
// 查找Event对应的所有的event类型(包括父类和接口)。
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
// 获取其中的一个Event类型(Event对应的类或者其父类或者其实现的接口的一种)
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 如果没有开启Event继承,则直接根据Event的类型,在指定的线程中发送Event即可。
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}

两个分支有什么不同呢?eventInheritance表示用户是否开启Event继承,如果不开启,则通过

postSingleEventForEventType(event, postingState, eventClass);

方法发送Event,如果开启,则首先通过

lookupAllEventTypes(eventClass);

查找event类所有的Event类型,然后依次通过代码

postSingleEventForEventType(event, postingState, eventClass);

进行消息处理,所以,我们分两步来,首先来看看lookupAllEventTypes的代码。

private List<Class<?>> lookupAllEventTypes(Class<?> eventClass)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<Class<?>>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

其实这个代码并不负责,简单的理解就是根据Event类查找其父类,然后添加到eventTypesCache里面。那另外一个函数干什么的呢?其实真正的工作都在postSingleEventForEventType(event, postingState, clazz);里面,我们去看看。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根据Event类型获取其对应的订阅者。
subscriptions = subscriptionsByEventType.get(eventClass);
}
// 如果存在订阅者
if (subscriptions != null && !subscriptions.isEmpty()) {
// 依次发送给对应的订阅者
for (Subscription subscription : subscriptions) {
// 设置post()方法调用线程中对应的postingState
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 将event发送给对应的订阅者。
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

什么?我们再一看,好吧,这个代码也没有去真正的发送消息,那我们看看这个方法到底做了什么工作呢?

  1. subscriptions = subscriptionsByEventType.get(eventClass);首先通过这行代码,从subscriptionsByEventType中,根据Event的类,获取所有的对应的subscriptions,需要注意的是,此段代码使用EventBus的实例进行同步,实际上是同步的订阅者的List
  2. 如果subscriptions==null 或者 subscriptions.size()==0,即如果不存在对应的subscriptions,那么则返回即可。
  3. 将Event依次发送给每一个Subscription
    1. 设置postingState.event = event,设置postingState.subscription = subscription;
    2. 调用方法postToSubscription(subscription, event, postingState.isMainThread);发送消息
    3. 恢复postingState默认状态为null
    4. 重复步骤3

可以看到,真正发送消息的工作还没有看到,在postToSubscription方法当中,好吧,那我们继续来学习这个方法都做了什么?

postToSubscription

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 将event发送给对应的调用者
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case PostThread:
// 直接调用
invokeSubscriber(subscription, event);
break;
case MainThread:
if (isMainThread) {
// 如果post的发送线程是UI线程,那么则直接调用对应的方法即可
invokeSubscriber(subscription, event);
} else {
// 否则则发送到main线程中对应的Handler中
mainThreadPoster.enqueue(subscription, event);
}
break;
case BackgroundThread:
// 背景线程
if (isMainThread) {
// 如果当前工作在主线程,则直接压入背景Poster的队列
backgroundPoster.enqueue(subscription, event);
} else {
// 反之则直接调用
invokeSubscriber(subscription, event);
}
break;
case Async:
//直接压入异步Poster的队列
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

可以看到,这里是最终的调用订阅者Event处理方法的地方,针对ThreadMode的不同,postToSubscription方法采取了不同的策略

  • PostThread:这个是直接在当前线程上调用处理的方法,所以直接通过invokeSubscriber(subscription, event);调用对应的方法
  • MainThread:
    • 当前在主线程上:同PostThread一样,直接通过invokeSubscriber(subscription, event);反射调用对应的方法
    • 不在主线程上,则通过mainThreadPoster.enqueue(subscription, event);将Event压入队列等待处理
  • BackgroundThread:
    • 当前在主线程上:通过backgroundPoster.enqueue(subscription, event);将Event压入队列,等待处理
    • 当前不在主线程上:通过invokeSubscriber(subscription, event);反射调用对应的方法
  • Async:直接将Event压入队列asyncPoster.enqueue(subscription, event);

好了,到这里,我们基本上明白了Event在EventBus中数据是怎么传递的了,但每一个ThreadMode不同的处理方法我们还没有看,到底是怎么样的呢?

invokeSubscriber(Subscription subscription, Object event) 源代码

1
2
3
4
5
6
7
8
9
    try {
// 使用反射机制,调用对应的事件处理函数。
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

这个地方的代码比较简单了,其实就是将反射的调用包装了一下,不需要多说。 我们先来看一下MainThread的时候,其Poster的处理办法:

HandlerPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/\*
\* Copyright (C) 2012 Markus Junginger, greenrobot (http://greenrobot.de)
\*
\* Licensed under the Apache License, Version 2.0 (the "License");
\* you may not use this file except in compliance with the License.
\* You may obtain a copy of the License at
\*
\* http://www.apache.org/licenses/LICENSE-2.0
\*
\* Unless required by applicable law or agreed to in writing, software
\* distributed under the License is distributed on an "AS IS" BASIS,
\* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\* See the License for the specific language governing permissions and
\* limitations under the License.
*/
package de.greenrobot.event;

import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.os.SystemClock;

/\*\*
\* 主线程Poster,本质上为一个Handler
*
*/
final class HandlerPoster extends Handler {

// 维护一个PendingPostQueue的队列
private final PendingPostQueue queue;
// 不太懂
private final int maxMillisInsideHandleMessage;
// EventBus对象
private final EventBus eventBus;
// 标记本Handler是否空闲:true:忙,false:空闲
private boolean handlerActive;

HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

// 入队列
void enqueue(Subscription subscription, Object event) {
// 获取一个PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将pendingPost放入queue队列当中
queue.enqueue(pendingPost);
// 发送消息
if (!handlerActive) {
handlerActive = true;
// 通过Handler中的MessageQueue,将通知工作在某个线程(可能是main Thread,post thread,background thread,asnyc thread)处理消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

// 消息的处理,需要注意的是,该函数段是工作在Looper对应的线程之上的。
// 有个问题,如果event很快处理完成,那么这个时候是不需要rescheduled的,那么如果在该event处理过程当中,已经放入其他的消息,那么这个消息会在什么时候得到处理呢?
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
// 记录开始时间
long started = SystemClock.uptimeMillis();
while (true) {
// 从等待处理的队列当中获取一个PendingPost
PendingPost pendingPost = queue.poll();
// 判断获取到的pendingPost是否为null,如果null则是没有需要处理的event
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
// 再次处理,需要注意的是,该方法是同步的,跟谁同步的呢?是跟enqueue方法中的代码块同步,做什么用呢?
// 我理解的是,此处的代码主要是用于避免一种现象的发生,就是上面已经给Handler发送消息,但并未处理的时候。---> 但貌似又不是
// 这次是对的:就是等待前面的enqueue函数执行完成,以便于从中获取event进行处理,如果此时仍然为空,说明队列是空的,标记handlerActive为空,
// 这样的话,下次enqueue的时候,就可以直接通过sendMessage通知Handler立刻进行处理。
pendingPost = queue.poll();
// 如果再次从中获取数据,但为空,则说明handler不是Activie的了。
if (pendingPost == null) {
// 标记handler已经空闲
handlerActive = false;
return;
}
}
}
// eventBus调用订阅者的对应的方法
eventBus.invokeSubscriber(pendingPost);
// 工作做完,统计消耗时间
long timeInMethod = SystemClock.uptimeMillis() - started;
// 超时
if (timeInMethod >= maxMillisInsideHandleMessage) {
// 立刻尝试处理下一个消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
// 设置标记
rescheduled = true;
return;
}
}
} finally {
// 如果已经rescheduled,那么说明此时该handler已经在忙,否则则说明handler已经空闲。
handlerActive = rescheduled;
}
}
}

上面是我添加过注释的源代码,我们可以发现以下的特点:

  1. 主线程的Poster本质上是一个Handler,因此关键的一点就是,看Handler到底工作在哪个Looper上,通过EventBus的默认初始化代码

    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

    可以发现,其实主线程上的Poster就是一个工作在主线程上的Handler,那么剩下的就比较简单了。

  2. enqueue,入队列,其实就做了以下的事情:
    1. 同步保护,防止从多个线程同时发送消息的时候出现错误
    2. queue.enqueue(pendingPost);将需要处理的PendingPost压入队列
    3. 通过sendMessage(obtainMessage())将消息发送给Handler进行处理
  3. handleMessage(Message msg):消息处理的方法
    1. pendingPost = queue.poll();获取数据,如果为null,那么意味着没有数据可以处理,标记当前活动状态为false,那么下一次enqueue入列的时候,就可以直接通知handler进行数据处理
    2. 获取成功,则通过eventBus.invokeSubscriber(pendingPost);调用相应的方法进行处理
    3. 如果此次消息处理超时,则直接通过sendMessage(obtainMessage())进行下一次消息处理

BackgroundPoster

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/\*
\* Copyright (C) 2012 Markus Junginger, greenrobot (http://greenrobot.de)
\*
\* Licensed under the Apache License, Version 2.0 (the "License");
\* you may not use this file except in compliance with the License.
\* You may obtain a copy of the License at
\*
\* http://www.apache.org/licenses/LICENSE-2.0
\*
\* Unless required by applicable law or agreed to in writing, software
\* distributed under the License is distributed on an "AS IS" BASIS,
\* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\* See the License for the specific language governing permissions and
\* limitations under the License.
*/
package de.greenrobot.event;

import android.util.Log;

/\*\*
\* Posts events in background.
\* 在后台线程当中处理events
\* @author Markus
*/
final class BackgroundPoster implements Runnable {

// 一个保存有PendingPost的队列
private final PendingPostQueue queue;
// 保持对EventBus的引用
private final EventBus eventBus;

// 看现在的BackgroundPoster是否正在处理event
private volatile boolean executorRunning;

BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
// 根据subscription和event构建PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将构建好的PendingPost加入到队列。
queue.enqueue(pendingPost);
if (!executorRunning) {
// 如果当前队列空闲,则设置其为忙,并通过EventBus的线程池执行该线程
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}

@Override
public void run() {
try {
try {
while (true) {
// 阻塞方法,从PendignPostQueue中获取一个PendingPost
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
// 原理同我们之前分析的mainPoster一样的,都是防止在加入的时候尝试取PendignPost而取不到,
// 代码到这里的时候,则保证如果要加入队列,工作已经完成的。
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 调用对应的订阅者方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
// 此时说明没有工作可做,因此释放该线程完成工作,设置标记为false。
executorRunning = false;
}
}

}

通过比较和Poster代码,发现其实现的原理不一样,但大体的机制基本一直:所有的Event都是缓存在PendingPostQueue当中,当enqueue的时候入队列,然后不同ThreadMode的Poster会以不同的方式处理相应的Event。 BackgroundPoster的Event的入列方式:

  1. 将PendingPost加入到队列当中
  2. 判断如果当前的executorRunning==false,即当前BackgroundPoster没有线程在处理
    1. 设置标记executorRunning=true
    2. 将该线程提交给EventBus默认的ExecutorService进行处理

BackgroundPoster的Event的处理方式的几个特点:

  1. 当开启一个BackgroundPoster之后,会一直处理所有的PendignPost直至所有的全部处理完成。
  2. 当使用queue.poll(1000)获取PendignPost,仍然没有取回之后,会进入同步保护块(避免此时有新的PendignPost加入队列,但该线程看不到),再次尝试,如果依然没有PendingPost,说明此时没有Event通过BackgroundPoster进行处理,线程可以安全退出。

此时,我们再来回顾使用EventBus中,关于BackgroundPoster的几个说明:

  1. BackgroundPoster只会顺序对Event进行处理,因此不适合并发的情况。

对比看完了BackgroundPoster,还需要继续学习一下AsyncPoster的使用

AsyncPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AsyncPoster implements Runnable {

private final PendingPostQueue queue;
private final EventBus eventBus;

AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
// 与mainPoster和backgroundPoster相比,直接将PendingPost执行
eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
// 获取需要处理的PendingPost
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
// 调用相应的订阅者方法
eventBus.invokeSubscriber(pendingPost);
}

}

一对比,发现跟BackgroundPoster有两点有区别:

  • 入队列的时候,会直接将线程提交给ExecutorService()进行处理,不需要检查当前是否有AsyncPoster任务在执行
  • 每一个AsyncPoster任务只负责一个PendignPostQueue的处理。