文章目录
  1. 1. 订阅模式
  2. 2. EventBus实现原理
    1. 2.1. subscribe注解
    2. 2.2. EventBus主类
    3. 2.3. 注解处理器
  3. 3. 一点疑问
  4. 4. 参考资料

本篇讲大名鼎鼎的开源库EventBus.

它的名字就解释了它的作用: 事件总线. 当事件发生时, 通过它发放到总线上的各个接收者.
事件总线(Event Bus)其实只是一种概念或者说一种模式, 而GreenRobot的EventBus项目实现了这种模式. 它不是唯一的实现者.
它的开发组织是greenrobot. 这个组织还有个项目挺有名的, 叫greendao.
本文基于EventBus 3.0代码.

订阅模式

在讲解EventBus之前, 我们重点要了解一下订阅者模式, 又叫做观察者模式. 从名字就可以看出来这种模式很适用于”当一件事情发生时, 所有关注这件事情的人都得到通知”这样的情景. 其实我觉得它就是批量地注册回调.
观察者模式在Java中的实现就是Observable接口, 而在EventBus中的实现就是Subscriber.

EventBus实现原理

尽管原理非常简单, 接口方面也可以想象, 但具体怎么实现还是有讲究的.
我们从接口入手, 看EventBus如何实现事件总线的作用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

1. 定义事件:

public static class MessageEvent { /* Additional fields if needed */ }
事件就是一个普通的Java类. 可以是静态的或非静态的.

2. 准备订阅者. 用注解的方式指定一个订阅者, 其唯一接受的参数是我们所关心的事件. 可以指定其[线程模式](http://greenrobot.org/eventbus/documentation/delivery-threads-threadmode/):

@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {/* Do something */};

然后可以注册和解除订阅者了. 例如在Android上, 注册和解除的方式如下:

@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this); // 将this这个订阅者注册到EventBus上
}

@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this); // 从EventBus上解除this这个订阅者
}

3. 发布事件:

EventBus.getDefault().post(new MessageEvent()); // 往EventBus上发布一个新的MessageEvent事件

通过以上接口, 我们可以有一些大胆的猜测:

  1. 事件应当是以Map<EventType, Subscribers> 这样的方式存储的, 同时, 事件和订阅者应当是多对多的关系, 所以应当也有以Map<Subscriber, EventTypes>这样的数据结构存在;
  2. 注册和解除订阅者时, 维护以上两个数据结构;
  3. 发布事件时, 查询以上两个数据结构;
  4. 可以添加其他的属性支持: 线程模式, 订阅者优先级, 事件模式.
    有了以上认知, 其实我们自己也可以实现一套简单的EventBus框架.

下面就来看看EventBus是怎么实现各个特性的.

subscribe注解

EventBus的核心就是Subscribe注解. 下面我们来看看它的实现(如果对注解不熟悉请留言告知, 人多的话我考虑出一篇讲Java注解的教程).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Retention(RetentionPolicy.RUNTIME) // 作用阶段是Runtime
@Target({ElementType.METHOD}) // 作用对象是Method
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING; // 线程模式. 描述了在什么线程上执行订阅者的订阅方法.

/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false; // 是否sticky

/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0; // 优先级. 仅影响同一线程中subscriber接收到事件的顺序.
}


/**
* Each subscriber method has a thread mode, which determines in which thread the method is to be called by EventBus.
* EventBus takes care of threading independently from the posting thread.
*
* @see EventBus#register(Object)
* @author Markus
*/
public enum ThreadMode {
/**
* Subscriber will be called directly in the same thread, which is posting the event. This is the default. Event delivery
* implies the least overhead because it avoids thread switching completely. Thus this is the recommended mode for
* simple tasks that are known to complete in a very short time without requiring the main thread. Event handlers
* using this mode must return quickly to avoid blocking the posting thread, which may be the main thread.
*/
POSTING, // 直接抛在当前线程

/**
* On Android, subscriber will be called in Android's main thread (UI thread). If the posting thread is
* the main thread, subscriber methods will be called directly, blocking the posting thread. Otherwise the event
* is queued for delivery (non-blocking). Subscribers using this mode must return quickly to avoid blocking the main thread.
* If not on Android, behaves the same as {@link #POSTING}.
*/
MAIN, // Android中直接抛给主线程, 非Android中行为与POSTING一样

/**
* On Android, subscriber will be called in Android's main thread (UI thread). Different from {@link #MAIN},
* the event will always be queued for delivery. This ensures that the post call is non-blocking.
*/
MAIN_ORDERED, // Android上直接抛给主线程, 总是放到队列里进行, 避免调用post方法的线程阻塞.

/**
* On Android, subscriber will be called in a background thread. If posting thread is not the main thread, subscriber methods
* will be called directly in the posting thread. If the posting thread is the main thread, EventBus uses a single
* background thread, that will deliver all its events sequentially. Subscribers using this mode should try to
* return quickly to avoid blocking the background thread. If not on Android, always uses a background thread.
*/
BACKGROUND, // Android上抛给后台线程或者非UI线程的当前线程执行, 非Android总是在后台线程执行.

/**
* Subscriber will be called in a separate thread. This is always independent from the posting thread and the
* main thread. Posting events never wait for subscriber methods using this mode. Subscriber methods should
* use this mode if their execution might take some time, e.g. for network access. Avoid triggering a large number
* of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads. EventBus
* uses a thread pool to efficiently reuse threads from completed asynchronous subscriber notifications.
*/
ASYNC // 总是在后台单独的线程执行.
}

上面的代码和注释已经很清楚了. 我们可以用注解, 让指定的方法, 在接收到指定的事件时, 在指定的线程上执行. 这个注解相当于标记了方法信息, 但具体怎么处理这个注解, 还得看其他入口.

EventBus主类

EventBus类就是其中一个入口.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
具体代码见: https://github.com/greenrobot/EventBus/blob/master/EventBus/src/org/greenrobot/eventbus/EventBus.java

/**
* 以下注释解释了EventBus的设计理念
* EventBus is a central publish/subscribe event system for Android. Events are posted ({@link #post(Object)}) to the
* bus, which delivers it to subscribers that have a matching handler method for the event type. To receive events,
* subscribers must register themselves to the bus using {@link #register(Object)}. Once registered, subscribers
* receive events until {@link #unregister(Object)} is called. Event handling methods must be annotated by
* {@link Subscribe}, must be public, return nothing (void), and have exactly one parameter
* (the event).
*
* @author Markus Junginger, greenrobot
*/
public class EventBus {

/** Log tag, apps may override it. */
public static String TAG = "EventBus";

static volatile EventBus defaultInstance;

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder(); // 建造者模式
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>(); // 以作为Event的类的class为Key, 缓存class和interface, 包括其祖先

private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; // 这就是我们期待看到的 Map<EventType, Subscribers>
private final Map<Object, List<Class<?>>> typesBySubscriber; // 这就是我们期待看到的 Map<Subscriber, EventTypes>
private final Map<Class<?>, Object> stickyEvents;

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
}; // 线程独立的变量, 用来存储跟线程有关的状态

// @Nullable
private final MainThreadSupport mainThreadSupport;
// @Nullable
private final Poster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
private final SubscriberMethodFinder subscriberMethodFinder;
private final ExecutorService executorService;

private final boolean throwSubscriberException;
private final boolean logSubscriberExceptions;
private final boolean logNoSubscriberMessages;
private final boolean sendSubscriberExceptionEvent;
private final boolean sendNoSubscriberEvent;
private final boolean eventInheritance;

private final int indexCount;
private final Logger logger;

/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}

public static EventBusBuilder builder() {
return new EventBusBuilder();
}

/** For unit test primarily. */
public static void clearCaches() {
SubscriberMethodFinder.clearCaches();
eventTypesCache.clear();
}

/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
* central bus, consider {@link #getDefault()}.
*/
public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

/**
* Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
* are no longer interested in receiving events.
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); // 在subscriber对象中寻找带Subscribe注解的方法
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod); // 对Subscribe方法调用subscribe完成注册
}
}
}

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { // 为什么不把这个方法直接声明为synchronized?
Class<?> eventType = subscriberMethod.eventType; // 获取到方法注册的EventType
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // 先看看这个EventType是不是已经有subscriber了, 如果没有就新建一个List
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) { // 如果List中已经有对应的<EventType, Subscriber>关系了, 则抛出异常, 表示重复注册了
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { // 根据优先级插入subscription到列表中
subscriptions.add(i, newSubscription);
break;
}
}

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); // 把当前处理的EventType记录到subscriber所处理的所有EventType的列表中
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) { // 处理sticky属性
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}

/**
* Checks if the current thread is running in the main thread.
* If there is no main thread support (e.g. non-Android), "true" is always returned. In that case MAIN thread
* subscribers are always called in posting thread, and BACKGROUND subscribers are always called from a background
* poster.
*/
private boolean isMainThread() {
return mainThreadSupport != null ? mainThreadSupport.isMainThread() : true;
}

public synchronized boolean isRegistered(Object subscriber) {
return typesBySubscriber.containsKey(subscriber);
}

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) { // 在EventType的所有subscriber中移除指定的subscriber. 这个只维护<EventType, Subscriber>
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); // 查询指定的subscriber
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType); // 对每个被订阅的EventType, 维护Map<EventType, Subscribers>
}
typesBySubscriber.remove(subscriber); // 从Map<Subscriber, EventTypes>中移除指定的subscriber
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event); // 将事件压入队列

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true; // 如果当前不在isPosting状态, 则转换为isPosting状态并开始post循环, 处理事件队列
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

/**
* Called from a subscriber's event handling method, further event delivery will be canceled. Subsequent
* subscribers
* won't receive the event. Events are usually canceled by higher priority subscribers (see
* {@link Subscribe#priority()}). Canceling is restricted to event handling methods running in posting thread
* {@link ThreadMode#POSTING}.
*/
public void cancelEventDelivery(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
if (!postingState.isPosting) {
throw new EventBusException(
"This method may only be called from inside event handling methods on the posting thread");
} else if (event == null) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}

postingState.canceled = true;
}

/**
* Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
* event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
*/
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

/**
* Gets the most recent sticky event for the given type.
*
* @see #postSticky(Object)
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (stickyEvents) {
return eventType.cast(stickyEvents.get(eventType));
}
}

/**
* Remove and gets the recent sticky event for the given event type.
*
* @see #postSticky(Object)
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (stickyEvents) {
return eventType.cast(stickyEvents.remove(eventType));
}
}

/**
* Removes the sticky event if it equals to the given event.
*
* @return true if the events matched and the sticky event was removed.
*/
public boolean removeStickyEvent(Object event) {
synchronized (stickyEvents) {
Class<?> eventType = event.getClass();
Object existingEvent = stickyEvents.get(eventType);
if (event.equals(existingEvent)) {
stickyEvents.remove(eventType);
return true;
} else {
return false;
}
}
}

/**
* Removes all sticky events.
*/
public void removeAllStickyEvents() {
synchronized (stickyEvents) {
stickyEvents.clear();
}
}

public boolean hasSubscriberForEvent(Class<?> eventClass) { // 查询指定的事件是否有订阅者
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
if (eventTypes != null) {
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(clazz);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
return true;
}
}
}
return false;
}

// 发布单个事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); // 查找指定事件相关联的所有事件
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); // 对每个事件class调用
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event)); // 发送"无人订阅该事件"事件
}
}
}

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread); // 将事件传递给订阅者
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) { // 如果过程中有任一事件调用了cancel, 则后续订阅者不再接收事件
break;
}
}
return true;
}
return false;
}

// 将事件根据属性分发到不同线程上
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING: // 直接抛到当前线程执行
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) { // 如果当前就是主线程则直接调用
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

/** Recurses through super interfaces. */
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}

/**
* Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions
* between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the
* subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the
* live cycle of an Activity or Fragment.
*/
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event); // 通过反射调用订阅者的方法
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
if (event instanceof SubscriberExceptionEvent) {
if (logSubscriberExceptions) {
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
+ exEvent.causingSubscriber, exEvent.throwable);
}
} else {
if (throwSubscriberException) {
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
}
if (sendSubscriberExceptionEvent) {
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
post(exEvent);
}
}
}

/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

ExecutorService getExecutorService() {
return executorService;
}

/**
* For internal use only.
*/
public Logger getLogger() {
return logger;
}

// Just an idea: we could provide a callback to post() to be notified, an alternative would be events, of course...
/* public */interface PostCallback {
void onPostCompleted(List<SubscriberExceptionEvent> exceptionEvents);
}

@Override
public String toString() {
return "EventBus[indexCount=" + indexCount + ", eventInheritance=" + eventInheritance + "]";
}

在上述代码加了注释之后, 读者应该能较好地理解EventBus主类的逻辑了. 其他的例如Poster, EventBusBuilder, SubscriberMethod, SubscriberMethodFinder等都是一些具体的实现方法了, 这里不再赘述, 有兴趣的朋友可以深入看一下.
以上代码需要注意的是, 事件和订阅者都是注册到具体的EventBus对象上的, 也就是说注册到不同EventBus对象上的事件和订阅者互相不可见. 作者在注释中说, 如果要让EventBus发挥集中式事件总线的作用, 应该用EventBus.getDefault()的方式获取EventBus对象.

注解处理器

EventBus项目在3.0版本加入了EventBusAnnotationProcessor, 可以在编译期处理注解, 达到性能优化的目的.

一点疑问

根据EventBus官方库的文档, 它比otto库快很多, 但是我不太明白它为什么比otto快.
可以想到的点是, 它将SubscriberMethod进行了缓存, 从而不必每次都反射调用. 它使用了CopyOnWriteArrayList, 从而不必进行同步操作.

参考资料

文章目录
  1. 1. 订阅模式
  2. 2. EventBus实现原理
    1. 2.1. subscribe注解
    2. 2.2. EventBus主类
    3. 2.3. 注解处理器
  3. 3. 一点疑问
  4. 4. 参考资料