博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
EventBus 3.0+ 源码详解(史上最详细图文讲解)
阅读量:5844 次
发布时间:2019-06-18

本文共 34857 字,大约阅读时间需要 116 分钟。

本文的整体结构图

本文篇幅很长,建议收藏了找时间慢慢看

本文讲解的是 'org.greenrobot:eventbus:3.1.1' 最新版,和其他版本有细微差别,思想都是一样的。

EventBus 的简单示例

@Overrideprotected void onStart() {    super.onStart();    EventBus.getDefault().register(this);}@Overrideprotected void onStop() {    super.onStop();    EventBus.getDefault().unregister(this);}@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)public void onMessageEvent(MyBusEvent event) {    Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();}复制代码

EventBus 的创建是一个单例模式,我们就从 getDefault() 开始讲起吧。

一、EventBus 的创建(单例 + Builder 设计模式)

1.1、单例模式获取 EventBus 实例

EventBus 的创建是一个双重校验锁的单例模式。

public static EventBus getDefault() {    if (sDefaultBus == null) {        synchronized (EventBus.class) {            if (sDefaultBus == null) {                sDefaultBus = new EventBus();            }        }    }    return sDefaultBus;}复制代码

单例模式没什么好说的,我们都知道单例模式的构造函数是私有类型 private,但是 EventBus 的构造函数却是 public 类型。

**这样设计的目:**EventBus 在代码使用过程中不仅仅只有一条总线,还有其他的订阅总线,订阅者可以注册到不同的 EventBus 总线,然后通过不同的 EventBus 总线发送数据。

不同的 EventBus 发送的数据是相互隔离的,订阅者只能收到注册了该 EventBus 总线内事件,而不会收到别的 EventBus 事件总线的数据。这样的设计为后面的不同环境的线程切换创造了好的条件。

/**     * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a central bus, consider {
@link #getDefault()}. 创建一个新的 EventBus 实例,每个实例在 events 事件被发送的时候都是一个单独的领域,为了使用一个 事件总线,考虑用 getDefault() 构建。 */private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();public EventBus() { this(DEFAULT_BUILDER);}复制代码

1.2、Builder 模式构建 EventBus

这里的代码我们重点看一下 this(DEFAULT_BUILDER) 里面的 DEFAULT_BUILDER。DEFAULT_BUILDER 是一个 EventBusBuilder,从这里可以看出 EventBus 是通过 建造者模式进行构建的,接下来我们看下是如何构建的吧。

EventBus(EventBusBuilder builder) {    logger = builder.getLogger();        //Map
, CopyOnWriteArrayList
> subscriptionsByEventType subscriptionsByEventType = new HashMap<>(); //Map
>> typesBySubscriber typesBySubscriber = new HashMap<>(); //Map
, Object> stickyEvents stickyEvents = new ConcurrentHashMap<>(); /** 用于线程间调度 **/ mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null; backgroundPoster = new BackgroundPoster(this); asyncPoster = new AsyncPoster(this); //用于记录event生成索引 indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0; //对已经注解过的Method的查找器,会对所设定过 @Subscriber 注解的的方法查找相应的Event subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); //当调用事件处理函数发生异常是否需要打印Log logSubscriberExceptions = builder.logSubscriberExceptions; //当没有订阅者订阅这个消息的时候是否打印Log logNoSubscriberMessages = builder.logNoSubscriberMessages; //当调用事件处理函数,如果异常,是否需要发送Subscriber这个事件 sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; //当没有事件处理函数时,对事件处理是否需要发送sendNoSubscriberEvent这个标志 sendNoSubscriberEvent = builder.sendNoSubscriberEvent; //是否需要抛出SubscriberException throwSubscriberException = builder.throwSubscriberException; //与Event有继承关系的类是否都需要发送 eventInheritance = builder.eventInheritance; //线程池 Executors.newCachedThreadPool() executorService = builder.executorService;}复制代码

##二、EventBus 中几个重要的成员变量

2.1、EventBus 中重要的 3 个 HashMap。

####2.1.1、subscriptionsByEventType

Map<Class<?>, CopyOnWriteArrayList> subscriptionsByEventType 是以 Event 为 Key,Subscriber 为 Value,当发送这个 Event 时,可以在 Map 中找到相应的 Subscriber。

####2.1.2、typesBySubscriber

Map<Object, List<Class<?>>> typesBySubscriber 以 Subscriber 为 Key,以 Event 为 Value,当我们注册和反注册时,都会操作 typesBySubscriber 这个 Map。

####2.1.3、stickyEvents

Map<Class<?>, Object> stickyEvents 是管理 EventBus 的粘性事件,粘性事件的 Event 发送出去之后,即使负责接收的 Subscriber 没有注册,当注册之后,依然能收到该事件,和广播接受者的粘性事件类似。

###2.2、EventBus 中重要的有 3 个 Post。

  • mainThreadPoster:主线程的 Poster

    mainThreadSupport.createPoster(this) 返回的是HandlerPoster(eventBus, looper, 10),looper 是一个 MainThread 的 Looper。意味着这个 HandlerPoster 可能就是 Handler 的实现。

  • backgroundPoster:后台线程的 Poster

  • asyncPoster:异步线程的 Poster

2.2.1、mainThreadPoster # HandlerPoster

HandlerPoster 其实就是 Handler 的实现,内部维护了一个 PendingPostQueue 的消息队列,在 enqueue(Subscription subscription, Object event) 方法中不断从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。

在 handleMessage 中,通过一个 while 死循环,不断从 PendingPostQueue 中取出 PendingPost 出来执行,获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件。

while 退出的条件有两个

  1. 获取到的 PendingPost 为 null,即是 PendingPostQueue 已经没有消息可处理。
  2. 每个 PendingPost 在 Handler 中执行的时间超过了最大的执行时间。

HandlerPoster UML 类图

HandlerPoster 执行过程

public class HandlerPoster extends Handler implements Poster {    private final PendingPostQueue queue;//存放待执行的 Post Events 的事件队列    private final int maxMillisInsideHandleMessage;//Post 事件在 handleMessage 中执行的最大的时间值,超过这个时间值则会抛异常    private final EventBus eventBus;    private boolean handlerActive;//标识 Handler 是否被运行起来   protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {        super(looper);        this.eventBus = eventBus;        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        //从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);//添加到队列中            if (!handlerActive) {
//标记 Handler 为活跃状态 handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) {
//死循环,不断从 PendingPost 队列中取出 post 事件执行 PendingPost pendingPost = queue.poll(); if (pendingPost == null) {
//如果为 null,表示队列中没有 post 事件,此时标记 Handelr 关闭,并退出 while 循环 synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; //标记 Handler 为非活跃状态 return; } } } //获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件 eventBus.invokeSubscriber(pendingPost); //计算每个事件在 handleMessage 中执行的时间 long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }}final class PendingPost { //通过ArrayList来实现PendingPost的添加和删除 private final static List
pendingPostPool = new ArrayList
(); Object event; Subscription subscription; PendingPost next; private PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } //获取 PendingPost static PendingPost obtainPendingPost(Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null; return pendingPost; } } return new PendingPost(event, subscription); } //释放 PendingPost static void releasePendingPost(PendingPost pendingPost) { pendingPost.event = null; pendingPost.subscription = null; pendingPost.next = null; synchronized (pendingPostPool) { // Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); } } }}复制代码

2.2.2、backgroundPoster # BackGroundPoster

BackgroundPoster 实现了 Runnable 和 Poster,enqueue() 和 HandlerPoster 中实现一样,在上文中已经讲过,这里不再赘述。

我们来看下 run() 方法中的实现,不断从 PendingPostQueue 中取出 pendingPost 到 EventBus 中分发,这里注意外部是 while() 死循环,意味着 PendingPostQueue 中所有的 pendingPost 都将分发出去。而 AsyncPoster 只是取出一个。

final class BackgroundPoster implements Runnable, Poster {    private final PendingPostQueue queue;    private final EventBus eventBus;    private volatile boolean executorRunning;    BackgroundPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);//添加到队列中            if (!executorRunning) {                executorRunning = true;                //在线程池中执行这个 pendingPost                eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()            }        }    }    @Override    public void run() {        try {            try {                //不断循环从 PendingPostQueue 取出 pendingPost 到 eventBus 执行                while (true) {                    //在 1000 毫秒内从 PendingPostQueue 中获取 pendingPost                    PendingPost pendingPost = queue.poll(1000);                    //双重校验锁判断 pendingPost 是否为 null                    if (pendingPost == null) {                        synchronized (this) {                            // Check again, this time in synchronized                            pendingPost = queue.poll();//再次尝试获取 pendingPost                            if (pendingPost == null) {                                executorRunning = false;                                return;                            }                        }                    }                    //将 pendingPost 通过 EventBus 分发出去                   //这里会将PendingPostQueue中【所有】的pendingPost都会分发,这里区别于AsyncPoster                    eventBus.invokeSubscriber(pendingPost);                }            } catch (InterruptedException e) {                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);            }        } finally {            executorRunning = false;        }    }}复制代码

2.2.3、asyncPoster # AsyncPoster

class AsyncPoster implements Runnable, Poster {    private final PendingPostQueue queue;    private final EventBus eventBus;    AsyncPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        queue.enqueue(pendingPost);        eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()    }    @Override    public void run() {        PendingPost pendingPost = queue.poll();        if(pendingPost == null) {            throw new IllegalStateException("No pending post available");        }        eventBus.invokeSubscriber(pendingPost);    }}复制代码

三、@Subscribe 的注解

当我们指定订阅方法的时候,会在方法上加上注解,如下,下面我们看看这个注解的具体含义

@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)public void onMessageEvent(EventTest event) {    }@Documented //命名为 java doc 文档@Retention(RetentionPolicy.RUNTIME) //指定在运行时有效,即在运行时能保持这个 Subscribe@Target({ElementType.METHOD}) //指定类型为 METHOD,表名用来描述方法public @interface Subscribe {    //指定线程模式,可以指定在 Subscribe 中接收的 Event 所处的线程    ThreadMode threadMode() default ThreadMode.POSTING;    boolean sticky() default false;    int priority() default 0;}复制代码

3.1、ThreadMode

public enum ThreadMode {    POSTING,//EventBus 默认的线程模式    MAIN,//主线程    MAIN_ORDERED,//主线程    BACKGROUND,//后台线程    ASYNC//异步线程}复制代码

ThreadMode 是 enum(枚举)类型,threadMode 默认值是 POSTING。3.1.1 版本的 EventBus 新增了一种类型,共 5 种,以前的版本是 4 种。

  • POSTING:事件发布在什么线程,就在什么线程订阅。故它不需要切换线程来分发事件,因此开销最小。它要求 task 完成的要快,不能请求 MainThread,适用简单的 task。
  • MAIN:无论事件在什么线程发布,都在主线程订阅。事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。
  • MAIN_ORDERED:无论事件在什么线程发布,都在主线程订阅。区别于 MAIN,MAIN_ORDERED 事件将始终排队等待交付。这将确保 post 调用是非阻塞的。
  • BACKGROUND:如果发布的线程不是主线程,则在该线程订阅,如果是主线程,则使用一个单独的后台线程订阅。
  • ASYNC:在非主线程和发布线程中订阅。当处理事件的 Method 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。

3.2、sticky 粘性事件

粘性事件是事件消费者在事件发布之后才注册,依然能接收到该事件的特殊类型。

StickyEvent 与普通 Event的 普通就在于,EventBus 会自动维护被作为 StickyEvent 被 post 出来(即在发布事件时使用 EventBus.getDefault().postSticky(new MyEvent()) 方法)的事件的最后一个副本在缓存中。 任何时候在任何一个订阅了该事件的订阅者中的任何地方,都可以通 EventBus.getDefault().getStickyEvent(MyEvent.class)来取得该类型事件的最后一次缓存。

sticky(粘性)默认值是 false,如果是 true,那么可以通过 EventBus 的 postSticky 方法分发最近的粘性事件给该订阅者(前提是该事件可获得)。

3.3、Priority

priority 是 Method 的优先级,优先级高的可以先获得分发事件的权利。这个不会影响不同的 ThreadMode 的分发事件顺序。

四、register 注册

  1. 通过反射获取到订阅者的 Class 对象。
  2. 通过 Class 对象找到对应的订阅者方法集合。
  3. 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。

register() 接收的参数为 Object 类型的订阅者,通常也就是代码中 Activity 和 Fragment 的实例 this。EventBus 通过 getDefault() 来获取实例,当我们每新建一个 EventBus 总线,它的发布和订阅事件都是相互隔离的,EventBus 如何做到发布和订阅相互隔离呢?我们看下 register 的实现。

public void register(Object subscriber) {    //1. 通过反射获取到订阅者的Class对象    Class
subscriberClass = subscriber.getClass(); //2. 通过Class对象找到对应的订阅者方法集合 List
subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); //3. 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。 synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } }}复制代码

在 1 中没什么可细讲的,接下来看下第 2 步中的 SubscriberMethodFinder 这个类的 findSubscriberMethods() 方法。

subscriberMethodFinder 是 EventBus 的一个成员,可以看作是一个订阅方法查找器,是在EventBus 构造方法通过 EventBusBuilder 的一些参数构造出来的。

调用 findSubscriberMethods 方法,传入订阅者的 Class 对象,字面意思是找出订阅者中所有的订阅方法,用一个 List 集合来接收。

4.1、SubscriberMethodFinder # findSubscriberMethods() 详解

//订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 valueprivate static final Map
, List
> METHOD_CACHE = new ConcurrentHashMap<>();List
findSubscriberMethods(Class
subscriberClass) { //1. 首先在 METHOD_CACHE 中查找该 Event 对应的订阅者集合是否已经存在,如果有直接返回 List
subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } //2. 根据订阅者类 subscriberClass 查找相应的订阅方法 if (ignoreGeneratedIndex) {
//是否忽略生成 index //通过反射获取 subscriberMethods = findUsingReflection(subscriberClass); } else { //通过 SubscriberIndex 方式获取 subscriberMethods = findUsingInfo(subscriberClass); } //若订阅者中没有订阅方法,则抛异常 if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { // 缓存订阅者的订阅方法 List METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; }}//----------------------------SubscriberMethod----------------------------//封装了EventBus中的参数,就是一个EventBus订阅方法包装类public class SubscriberMethod { final Method method; final ThreadMode threadMode; final Class
eventType; final int priority; final boolean sticky; String methodString; //.....涉及到的方法后文再讲}复制代码

METHOD_CACHE 是一个 ConcurrentHashMap,以订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 value,缓存了注册过的订阅方法。

如果有缓存则返回返回缓存,如果没有则继续往下执行。这里看到 ignoreGeneratedIndex 这个属性,意思为是否忽略生成 index,是在构造 SubscriberMethodFinder 通过 EventBusBuilder 的同名属性赋值的,默认为 false,当为 true 时,表示以反射的方式获取订阅者中的订阅方法,当为 false 时,则以 Subscriber Index 的方式获取。接下来分别分析这两种方式。

####4.1.1、findUsingReflection() 方法解析

当 ignoreGeneratedIndex 为 true 时 --> findUsingReflection()

private List
findUsingReflection(Class
subscriberClass) { // 创建并初始化 FindState 对象 FindState findState = prepareFindState(); // findState 与 subscriberClass 关联 findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { // 使用反射的方式获取单个类的订阅方法 findUsingReflectionInSingleClass(findState); // 使 findState.clazz 指向父类的 Class,继续获取 findState.moveToSuperclass(); } // 返回订阅者及其父类的订阅方法 List,并释放资源 return getMethodsAndRelease(findState);}复制代码

####4.1.2、findUsingInfo() 方法解析

当 ignoreGeneratedIndex 为 false 时 --> findUsingInfo()

跟反射方式的 findUsingReflection 的首尾有点类似,不同的是它是通过 SubscriberInfo 这个类来获取订阅方法的,那么 SubscriberInfo 对象是怎么获取的呢,那么同样只看关键代码:getSubscriberInfo()

private List
findUsingInfo(Class
subscriberClass) { //1.通过 prepareFindState 获取到 FindState(保存找到的注解过的方法的状态) FindState findState = prepareFindState(); //2.findState 与 subscriberClass 关联 findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { //获取订阅者信息 //通过 SubscriberIndex 获取 findState.clazz 对应的 SubscriberInfo findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { // 逐个添加进 findState.subscriberMethods findState.subscriberMethods.add(subscriberMethod); } } } else { // 使用反射的方式获取单个类的订阅方法 findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState);}复制代码

FindState 封装了所有的订阅者和订阅方法的集合。

static class FindState {    //保存所有订阅方法    final List
subscriberMethods = new ArrayList<>(); //事件类型为Key,订阅方法为Value final Map
anyMethodByEventType = new HashMap<>(); //订阅方法为Key,订阅者的Class对象为Value final Map
subscriberClassByMethodKey = new HashMap<>(); final StringBuilder methodKeyBuilder = new StringBuilder(128); Class
subscriberClass; Class
clazz; boolean skipSuperClasses; SubscriberInfo subscriberInfo; //......}复制代码

通过 prepareFindState 获取到 FindState 对象,根据 FindState 对象可以进行下一步判断,

private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];private FindState prepareFindState() {    //找到 FIND_STATE_POOL 对象池    synchronized (FIND_STATE_POOL) {        for (int i = 0; i < POOL_SIZE; i++) {            //当找到了对应的FindState            FindState state = FIND_STATE_POOL[i];            if (state != null) {
//FindState 非空表示已经找到 FIND_STATE_POOL[i] = null; //清空找到的这个FindState,为了下一次能接着复用这个FIND_STATE_POOL池 return state;//返回该 FindState } } } //如果依然没找到,则创建一个新的 FindState return new FindState();}复制代码

####4.1.3、findUsingReflectionInSingleClass() 方法解析

通过反射找到对应的方法,并进行过滤

private void findUsingReflectionInSingleClass(FindState findState) {    Method[] methods;    try {        // This is faster than getMethods, especially when subscribers are fat classes like Activities        methods = findState.clazz.getDeclaredMethods();    } catch (Throwable th) {        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149        methods = findState.clazz.getMethods();        findState.skipSuperClasses = true;    }    for (Method method : methods) {        int modifiers = method.getModifiers();        //忽略非 public 和 static 的方法        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {            // 获取订阅方法的所有参数            Class
[] parameterTypes = method.getParameterTypes(); // 订阅方法只能有一个参数,否则忽略 if (parameterTypes.length == 1) { // 获取有 Subscribe 的注解 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { // 获取第一个参数 Class
eventType = parameterTypes[0]; // 检查 eventType 决定是否订阅,通常订阅者不能有多个 eventType 相同的订阅方法 if (findState.checkAdd(method, eventType)) { // 获取线程模式 ThreadMode threadMode = subscribeAnnotation.threadMode(); // 添加订阅方法进 List findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } }}复制代码

经过修饰符、参数个数、是否有注解、和订阅者是否有 eventType 相同的方法几层条件的筛选,最终将订阅方法添加进 findState 的 subscriberMethods 这个 List 中。

EventBus 不仅仅获取当前类的订阅方法,还会获取它所有父类的订阅方法。

在 EventBus 中,一个订阅者包括这个订阅者的所有父类和子类,不会有多个方法相同的去接收同一个事件,但是有可能出现这样一种情况,子类去订阅了该事件,父类也去订阅了该事件。当出现这种情况,EventBus 如何判断?通过调用 checkAddWithMethodSignature() 方法,根据方法签名来检查。

####4.1.4、checkAdd() & checkAddWithMethodSignature() 方法解析

boolean checkAdd(Method method, Class
eventType) { //事件类型为Key,订阅方法为Value final Map
anyMethodByEventType = new HashMap<>(); //put()方法执行之后,返回的是之前put的值 Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { // Paranoia check throw new IllegalStateException(); } // Put any non-Method object to "consume" the existing Method anyMethodByEventType.put(eventType, this); } //根据方法签名来检查 return checkAddWithMethodSignature(method, eventType); }}//订阅方法为Key,订阅者的Class对象为Valuefinal Map
subscriberClassByMethodKey = new HashMap<>();private boolean checkAddWithMethodSignature(Method method, Class
eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class
methodClass = method.getDeclaringClass(); //put方法返回的是put之前的对象 Class
methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); //如果methodClassOld不存在或者是methodClassOld的父类的话,则表明是它的父类,直接返回true。否则,就表明在它的子类中也找到了相应的订阅,执行的 put 操作是一个 revert 操作,put 进去的是 methodClassOld,而不是 methodClass if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // Only add if not already found in a sub class return true; } else { //这里是一个revert操作,所以如果找到了它的子类也订阅了该方法,则不允许父类和子类都同时订阅该事件,put 的是之前的那个 methodClassOld,就是将以前的那个 methodClassOld 存入 HashMap 去覆盖相同的订阅者。 //不允许出现一个订阅者有多个相同方法订阅同一个事件 // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; }}复制代码

FindState 的流程

  1. 调用 prepareFindState(),从 FIND_STATE_POOL 中获取一个 FindState,如果没有找到,则创建一个新的 FindState。
  2. 将 subscriberClass 通过 FindState 进行初始化。
  3. 返回所有的订阅者的方法和集合。

到此,两种方式讲解完毕。无论通过哪种方式获取,获取到订阅方法 List 之后,接下来是真正订阅的过程,回到register() 中看代码。

###4.2、subscribe 订阅

public void register(Object subscriber) {    Class
subscriberClass = subscriber.getClass(); List
subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { //迭代每个 Subscribe 方法,调用 subscribe() 传入 subscriber(订阅者) 和 subscriberMethod(订阅方法) 完成订阅, for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } }}复制代码

private final Map
, CopyOnWriteArrayList
> subscriptionsByEventType;private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class
eventType = subscriberMethod.eventType; // 创建 Subscription 封装订阅者和订阅方法信息 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); //可并发读写的ArrayList,key为EventType,value为Subscriptions //根据事件类型从 subscriptionsByEventType 这个 Map 中获取 Subscription 集合 CopyOnWriteArrayList
subscriptions = subscriptionsByEventType.get(eventType); //如果为 null,表示还没有订阅过,创建并 put 进 Map if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //若subscriptions中已经包含newSubscription,表示该newSubscription已经被订阅过,抛出异常 if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // 按照优先级插入subscriptions int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } //key为订阅者,value为eventType,用来存放订阅者中的事件类型 //private final Map
>> typesBySubscriber; List
> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } //将EventType放入subscribedEvents的集合中 subscribedEvents.add(eventType); //判断是否为Sticky事件 if (subscriberMethod.sticky) { //判断是否设置了事件继承 if (eventInheritance) { //获取到所有Sticky事件的Set集合 Set
, Object>> entries = stickyEvents.entrySet(); //遍历所有Sticky事件 for (Map.Entry
, Object> entry : entries) { Class
candidateEventType = entry.getKey(); //判断当前事件类型是否为黏性事件或者其子类 if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); // 执行设置了 sticky 模式的订阅方法 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }}复制代码

4.3、register 流程图

事件的订阅讲解到这里,接下来看看事件的分发 Post。

五、Post 分发

###5.1、Post 方法解析

一般的事件发布方式

EventBus.getDefault().post(new EventTest());复制代码

接下来看看 Post 方法的具体代码。

//currentPostingThreadState 线程独有的private final ThreadLocal
currentPostingThreadState = new ThreadLocal
() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };public void post(Object event) { //获取当前线程的 posting 状态 PostingThreadState postingState = currentPostingThreadState.get(); //获取当前事件队列 List
eventQueue = postingState.eventQueue; //将事件添加进当前线程的事件队列 eventQueue.add(event); //判断是否正在posting if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true; //如果已经取消,则抛出异常 if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) {
//发送事件 postSingleEvent(eventQueue.remove(0), postingState); } } finally { //状态复原 postingState.isPosting = false; postingState.isMainThread = false; } }}//发送事件的线程封装类final static class PostingThreadState { final List eventQueue = new ArrayList<>();//事件队列 boolean isPosting;//是否正在 posting boolean isMainThread;//是否为主线程 Subscription subscription; Object event; boolean canceled;//是否已经取消}复制代码

EventBus 用 ThreadLocal 存储每个线程的 PostingThreadState,一个存储了事件发布状态的类,当 post 一个事件时,添加到事件队列末尾,等待前面的事件发布完毕后再拿出来发布,这里看事件发布的关键代码postSingleEvent()。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {   Class
eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { //查找到所有继承关系的事件类型,将该类的父类全部放入集合中 List
> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class
clazz = eventTypes.get(h); //判断是否找到订阅者订阅了该事件 subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { //发送没有订阅者订阅该事件 post(new NoSubscriberEvent(this, event)); } }}复制代码

代码也非常简单,首先看 eventInheritance 这个属性,是否开启事件继承,若是,找出发布事件的所有父类,也就是 lookupAllEventTypes(),然后遍历每个事件类型进行发布。若不是,则直接发布该事件。

如果需要发布的事件没有找到任何匹配的订阅信息,则发布一个 NoSubscriberEvent 事件。这里只看发布事件的关键代码 postSingleEventForEventType()。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
eventClass) { CopyOnWriteArrayList
subscriptions; //获取到 Subscription 的集合 synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { //调用 postToSubscription 发送 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false;}复制代码

来到这里,开始根据事件类型匹配出订阅信息,如果该事件有订阅信息,则执行 postToSubscription(),发布事件到每个订阅者,返回 true,若没有,则返回 false。继续追踪发布事件到具体订阅者的代码 postToSubscription()。

###5.2、postToSubscription() 方法解析

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {    switch (subscription.subscriberMethod.threadMode) {        //订阅线程跟随发布线程,EventBus 默认的订阅方式        case POSTING:            // 订阅线程和发布线程相同,直接订阅            invokeSubscriber(subscription, event);            break;        // 订阅线程为主线程        case MAIN:            //如果在 UI 线程,直接调用 invokeSubscriber            if (isMainThread) {                invokeSubscriber(subscription, event);            } else {     //如果不在 UI 线程,用 mainThreadPoster 进行调度,即上文讲述的 HandlerPoster 的 Handler 异步处理,将订阅线程切换到主线程订阅                mainThreadPoster.enqueue(subscription, event);            }            break;        // 订阅线程为主线程        case MAIN_ORDERED:            if (mainThreadPoster != null) {                mainThreadPoster.enqueue(subscription, event);            } else {                // temporary: technically not correct as poster not decoupled from subscriber                invokeSubscriber(subscription, event);            }            break;        // 订阅线程为后台线程        case BACKGROUND:            //如果在 UI 线程,则将 subscription 添加到后台线程的线程池            if (isMainThread) {                backgroundPoster.enqueue(subscription, event);            } else {            //不在UI线程,直接分发                invokeSubscriber(subscription, event);            }            break;        // 订阅线程为异步线程        case ASYNC:            // 使用线程池线程订阅            asyncPoster.enqueue(subscription, event);            break;        default:            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);    }}复制代码

订阅者五种线程模式的特点对应的就是以上代码,简单来讲就是订阅者指定了在哪个线程订阅事件,无论发布者在哪个线程,它都会将事件发布到订阅者指定的线程。这里我们继续看实现订阅者的方法。

void invokeSubscriber(Subscription subscription, Object event) {    try {        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);    } catch (InvocationTargetException e) {        handleSubscriberException(subscription, event, e.getCause());    } catch (IllegalAccessException e) {        throw new IllegalStateException("Unexpected exception", e);    }}复制代码

订阅者接收到了事件,调用订阅方法,传入发布的事件作为参数,至此,事件发布过程就结束了。

5.3、post 流程图

六、unregister 反注册

先看反注册的代码

EventBus.getDefault().unregister(this);复制代码

跟踪 unregister()

public synchronized void unregister(Object subscriber) {    List
> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class
eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); }}复制代码

注册过程我们就知道 typesBySubscriber 是保存订阅者的所有订阅事件类型的一个 Map,这里根据订阅者拿到订阅事件类型 List,然后逐个取消订阅,最后 typesBySubscriber 移除该订阅者,。这里只需要关注它是如果取消订阅的,跟踪 unsubscribeByEventType()。

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */private void unsubscribeByEventType(Object subscriber, Class
eventType) { List
subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false; subscriptions.remove(i); i--; size--; } } }}复制代码

subscriptionsByEventType 是存储事件类型对应订阅信息的 Map,代码逻辑非常清晰,找出某事件类型的订阅信息 List,遍历订阅信息,将要取消订阅的订阅者和订阅信息封装的订阅者比对,如果是同一个,则说明该订阅信息是将要失效的,于是将该订阅信息移除。

七、总结

回顾一下 EventBus 的三个步骤

  1. 注册订阅者
  2. 事件发布
  3. 反注册订阅者

好了,EventBus 的源码解析到这就结束了,想进一步了解 EventBus 的朋友可以亲自去阅读源码。

转载地址:http://qnqcx.baihongyu.com/

你可能感兴趣的文章
RedisRepository封装—Redis发布订阅以及StackExchange.Redis中的使用
查看>>
sql-server-storage-internals
查看>>
Linux 使用 iptables屏蔽IP段
查看>>
李洪强经典面试题32
查看>>
mysql 添加列,修改列,删除列
查看>>
苹果装WIN 7
查看>>
Mininet实验 自定义拓扑结构
查看>>
datagrid鼠标悬浮提示
查看>>
Node.js大众点评爬虫
查看>>
Html5
查看>>
微信红包惊人秘密:谁最容易抢到大红包?
查看>>
Spark 概念学习系列之从物理执行的角度透视spark Job(十七)
查看>>
连接 insance 到 vlan101 - 每天5分钟玩转 OpenStack(97)
查看>>
sqlserver两种分页方法比较
查看>>
【python】面向对象编程
查看>>
分布式系统关注点——99%的人都能看懂的「熔断」以及最佳实践
查看>>
常见的 Web 应用攻击示例
查看>>
【43】学习处理模版化基类内的名称
查看>>
Tag:meta
查看>>
纯javaScript实现div层拖动/移位效果 推荐学习
查看>>