事件总线 —— otto的bus和eventbus对比分析

因文章很快被人转载到一些其他网站,所以本人在此声明:
转载请标明转载出处:http://frodoking.github.io/2015/03/30/android-eventbus-otto-analysis/

##出现场景
为了简化并且更加高质量的在Activities, Fragments, Threads, Services等等之间的通信,同时解决组建之间的高耦合还能继续高效的通信。事件总线设计出现了。
总线,在计算机组成原理中遇到过io总线。总线的思路就是负责传递某种object到指定的地方。
在Android内置的Intent和BroadcastReceiver就是采用了类似事件总线的设计思路。这两者都可以起到跟事件总线类似的效果。注册广播接收器和单纯发一个intent就可以唤起其他组件,提醒其他组件更新,这是非常方便的,同时也是本文提到的两个开源方案所做不到的。但也有不好地方,它们内部的实现都需要 IPC,单从传递效率上来讲,可能并不太适合上层的组件间通信。本文章主要讨论的app内部组件间的通信。

##基本用法对比

EventBus三步骤
1.定义 events:

1
public class MessageEvent { /* Additional fields if needed */ }

2.注册订阅者:

1
2
eventBus.register(this);
public void onEvent(AnyEventType event) {/* Do something */};

3.发布时间events:

1
eventBus.post(event);

Otto四步骤
1.初始化bus

1
2
Bus bus = new Bus(); 
Bus bus2 = new Bus(ThreadEnforcer.MAIN);//主线程

这里可以指定@Subscribe和@Produce标注的回调方法所运行的线程,默认是在MainThread中执行。如果不关心在哪个线程执行,可以使用ThreadEnforcer.ANY,甚至可以使用自己实现的ThreadEnforcer接口。
2.订阅事件

1
2
3
@Subscribe public void answerAvailable(AnswerAvailableEvent event) {
// TODO: React to the event somehow!
}

注意subscribe方法接收的参数类型需要和post参数的类型一致或者是post参数类型的父类。
3.发布事件

1
bus.post(new AnswerAvailableEvent(42));

bus.register(this);
一旦调用了register方法,Otto就会通过反射去寻找所有带有@Subscribe或者@Produce注解的方法,并将这些方法缓存下来。只有在调用了register之后,该类里面标注了@Subscribe或者@Produce的方法才会在适当的时候被调用。另外,当不需要订阅事件的时候,可以调用unregister来取消订阅。
4.生产者
有时候当订阅某个事件的时候,希望能够获取当前的一个值,比如订阅位置变化事件的时候,希望能拿到当前的位置信息。Otto中@Produce正是扮演了这么一个生产者的角色。@Produce也是用于方法,并且这个方法的参数必须为空,返回值是你要订阅的事件的类型。

1
2
3
4
@Produce public AnswerAvailableEvent produceAnswer() {  
// Assuming 'lastAnswer' exists.
return new AnswerAvailableEvent(this.lastAnswer);
}

使用标签Produce之后,也需要调用bus.register()。调用了register方法之后,所有之前订阅AnswerAvailableEvent事件的方法都会被执行一次,参数就是produceAnswer方法的返回值,之后任何新的订阅了AnswerAvailableEvent事件的方法,也都会立即调用produceAnswer方法。

##观察者模式
概述
观察者模式有时被称作发布/订阅模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。

解决的问题
将一个系统分割成一个一些类相互协作的类有一个不好的副作用,那就是需要维护相关对象间的一致性。我们不希望为了维持一致性而使各类紧密耦合,这样会给维护、扩展和重用都带来不便。观察者就是解决这类的耦合关系的。

模式中的角色

  • 抽象主题(Subject):它把所有观察者对象的引用保存到一个聚集里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象。
  • 具体主题(ConcreteSubject):将有关状态存入具体观察者对象;在具体主题内部状态改变时,给所有登记过的观察者发出通知。
  • 抽象观察者(Observer):为所有的具体观察者定义一个接口,在得到主题通知时更新自己。
  • 具体观察者(ConcreteObserver):实现抽象观察者角色所要求的更新接口,以便使本身的状态与主题状态协调。

再来看看观察者模式的类图
观察者模式的类图
下面是本人关于该模式的具体实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public interface Subject {
void attach(Observer obs);
void detach(Observer obs);
void notifyObserver();
}

public class ConcreteSubject implements Subject {

private Vector<Observer> obsVector = new Vector<Observer>();

@Override
public void attach(Observer obs) {
obsVector.add(obs);
}

@Override
public void detach(Observer obs) {
obsVector.remove(obs);
}

@Override
public void notifyObserver() {
for (Observer o : obsVector) {
o.update();
}
}

public Enumeration<Observer> observers() {
return obsVector.elements();
}

public void change() {
notifyObserver();
}
}

public interface Observer {
void update();
}

public class ConcreteObserver implements Observer {

@Override
public void update() {
System.out.println("收到通知,并进行处理");
}
}

public class Client {
public static void main(String args[]) {
Subject subject = new ConcreteSubject();

Observer observer = new ConcreteObserver();

subject.attach(observer);
subject.notifyObserver();
}
}
```

##源码实现方式对比
整体描述完了eventbus和otto的使用方式以及采用的观察者模式后,在这里重点分析一下这两个项目是如何实现各自的订阅事件、发布事件以及多环境下的切换问题(这里切换主要是主线程还是非主线程以及同步与异步问题)

**事件订阅分析对比**
1、eventbus
在这里重点讲一下eventbus如何实现订阅的关键地方(EventBus.getDefault().register(this))

先来看一个非常重要的工具类**SubscriberMethodFinder.findSubscriberMethods(Class<?> subscriberClass)**

//工具类,查找订阅者下的所有方法
class SubscriberMethodFinder {
private static final String ON_EVENT_METHOD_NAME = “onEvent”;//这个字段非常的重要
List findSubscriberMethods(Class<?> subscriberClass) {
String key = subscriberClass.getName();
List subscriberMethods;
synchronized (methodCache) {
subscriberMethods = methodCache.get(key);
}
if (subscriberMethods != null) {
return subscriberMethods;
}
subscriberMethods = new ArrayList();
Class<?> clazz = subscriberClass;
HashSet eventTypesFound = new HashSet();
StringBuilder methodKeyBuilder = new StringBuilder();
while (clazz != null) {
String name = clazz.getName();
if (name.startsWith(“java.”) || name.startsWith(“javax.”) || name.startsWith(“android.”)) {
// Skip system classes, this just degrades performance
break;
}

        // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
        //得到所有的方法
        Method[] methods = clazz.getDeclaredMethods();
        for (Method method : methods) {
            String methodName = method.getName();
            //查看当前查找的class内部所有以onEvent字段开始的方法
            if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
                int modifiers = method.getModifiers();
                //是否是public且非static和abstract方法,是否是一个参数。如果都复合,才进入封装的部分。
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length());
                        ThreadMode threadMode;
                        //根据方法的后缀,来确定threadMode,threadMode是个枚举类型:就四种情况。
                        if (modifierString.length() == 0) {
                            threadMode = ThreadMode.PostThread;
                            //主线程(UI线程采用handler更新机制)
                        } else if (modifierString.equals("MainThread")) {
                            threadMode = ThreadMode.MainThread;
                            //后台线程池(队列式的一个接一个)
                        } else if (modifierString.equals("BackgroundThread")) {
                            threadMode = ThreadMode.BackgroundThread;
                            //后台线程池(异步的,跟background共享线程池)
                        } else if (modifierString.equals("Async")) {
                            threadMode = ThreadMode.Async;
                        } else {
                            if (skipMethodVerificationForClasses.containsKey(clazz)) {
                                continue;
                            } else {
                                throw new EventBusException("Illegal onEvent method, check for typos: " + method);
                            }
                        }
                        Class<?> eventType = parameterTypes[0];
                        methodKeyBuilder.setLength(0);
                        methodKeyBuilder.append(methodName);
                        methodKeyBuilder.append('>').append(eventType.getName());
                        String methodKey = methodKeyBuilder.toString();
                        if (eventTypesFound.add(methodKey)) {
                            // Only add if not already found in a sub class
                            //将method, threadMode, eventType传入构造了:new SubscriberMethod(method, threadMode, eventType)。添加到List,最终放回。
                            subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType));
                        }
                    }
                } else if (!skipMethodVerificationForClasses.containsKey(clazz)) {
                    Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + clazz + "."
                            + methodName);
                }
            }
        }
        //扫描所有的父类,不仅仅是当前类。
        clazz = clazz.getSuperclass();
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called "
                + ON_EVENT_METHOD_NAME);
    } else {
        synchronized (methodCache) {
            methodCache.put(key, subscriberMethods);
        }
        return subscriberMethods;
    }
}

}

1
2

EventBus注册方法

private synchronized void register(Object subscriber, boolean sticky, int priority) {
//找到需要订阅的方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
    for (SubscriberMethod subscriberMethod : subscriberMethods) {
        //发起订阅
        subscribe(subscriber, subscriberMethod, sticky, priority);
    }
}

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {

    Class<?> eventType = subscriberMethod.eventType;
    //根据subscriberMethod.eventType,去subscriptionsByEventType去查找一个CopyOnWriteArrayList<Subscription>,如果没有则创建。
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    //把传入的参数封装成了一个:Subscription(subscriber, subscriberMethod, priority)
    //这里的subscriptionsByEventType是个Map,key:eventType ; value:CopyOnWriteArrayList<Subscription> 
    //这个Map其实就是EventBus存储方法的地方
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<Subscription>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
    // subscriberMethod.method.setAccessible(true);
    //添加newSubscription;并且是按照优先级添加的。
    //可以看到,优先级越高,会插到在当前List的前面。
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    //根据subscriber存储它所有的eventType; 依然是map;key:subscriber ,value:List<eventType> ;主要用于isRegister的判断。
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<Class<?>>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    //判断sticky;如果为true,从stickyEvents中根据eventType去查找有没有stickyEvent,
    if (sticky) {
        Object stickyEvent;
        synchronized (stickyEvents) {
            stickyEvent = stickyEvents.get(eventType);
        }
        //如果有则立即发布去执行。stickyEvent其实就是我们post时的参数。
        if (stickyEvent != null) {
            // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
            // --> Strange corner case, which we don't take care of here.
            postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
        }
    }
}
1
2
3
4
5
6
7
8

2、otto
同样看一下otto的关于订阅的关键点(new Bus().register(Object obj))

类似于eventbus,otto也有一个非常重要的工具类
**AnnotatedHandlerFinder.findAllSubscribers(Object listener)**
**AnnotatedHandlerFinder.findAllProducers(Object listener)**
在下面的代码中主要以订阅者方法为重点展开

/* This implementation finds all methods marked with a {@link Subscribe} annotation. /
//这个方法实现了基于java的注解方式实现查找当前listener所有被标记了Subscribe的方法
static Map, Set> findAllSubscribers(Object listener) {
Class<?> listenerClass = listener.getClass();
Map, Set> handlersInMethod = new HashMap, Set>();

//检查cache中是否已经存在加入内存中的class
if (!SUBSCRIBERS_CACHE.containsKey(listenerClass)) {
  loadAnnotatedMethods(listenerClass);
}
Map<Class<?>, Set<Method>> methods = SUBSCRIBERS_CACHE.get(listenerClass);
if (!methods.isEmpty()) {
  for (Map.Entry<Class<?>, Set<Method>> e : methods.entrySet()) {
    Set<EventHandler> handlers = new HashSet<EventHandler>();
    for (Method m : e.getValue()) {
        //为每个方法添加一个Handler,对event的处理的一个包装
      handlers.add(new EventHandler(listener, m));
    }
    //按照class为key,handler为value存入
    handlersInMethod.put(e.getKey(), handlers);
  }
}

return handlersInMethod;

}

1
2

订阅者和生产者注入

public void register(Object object) {
if (object == null) {
throw new NullPointerException(“Object to register must not be null.”);
}
enforcer.enforce(this);

//查找生产者方法
Map<Class<?>, EventProducer> foundProducers = handlerFinder.findAllProducers(object);
for (Class<?> type : foundProducers.keySet()) {

  final EventProducer producer = foundProducers.get(type);
  EventProducer previousProducer = producersByType.putIfAbsent(type, producer);
  //checking if the previous producer existed
  if (previousProducer != null) {
    throw new IllegalArgumentException("Producer method for type " + type
      + " found on type " + producer.target.getClass()
      + ", but already registered by type " + previousProducer.target.getClass() + ".");
  }
  Set<EventHandler> handlers = handlersByType.get(type);
  if (handlers != null && !handlers.isEmpty()) {
    for (EventHandler handler : handlers) {
      dispatchProducerResultToHandler(handler, producer);
    }
  }
}

//查找所有的订阅者方法
Map, Set> foundHandlersMap = handlerFinder.findAllSubscribers(object);
for (Class<?> type : foundHandlersMap.keySet()) {
//按照type为class的key查找handler
Set handlers = handlersByType.get(type);
//如果没有那么就创建新的集合并存入到内存
if (handlers == null) {
//concurrent put if absent
Set handlersCreation = new CopyOnWriteArraySet();
handlers = handlersByType.putIfAbsent(type, handlersCreation);
if (handlers == null) {
handlers = handlersCreation;
}
}
//做一个检查是否注册进去了
final Set foundHandlers = foundHandlersMap.get(type);
if (!handlers.addAll(foundHandlers)) {
throw new IllegalArgumentException(“Object already registered.”);
}
}

//针对生产者的一个Dispatch 结果功能
for (Map.Entry<Class<?>, Set<EventHandler>> entry : foundHandlersMap.entrySet()) {
  Class<?> type = entry.getKey();
  EventProducer producer = producersByType.get(type);
  if (producer != null && producer.isValid()) {
    Set<EventHandler> foundHandlers = entry.getValue();
    for (EventHandler foundHandler : foundHandlers) {
    //查看是否可用,不可用即跳出
      if (!producer.isValid()) {
        break;
      }
      if (foundHandler.isValid()) {
      //可用则分发结果
        dispatchProducerResultToHandler(foundHandler, producer);
      }
    }
  }
}

}
```

对比
从上边的源码可以很明显看出,事件订阅的处理差别
1、eventbus是采用反射的方式对整个注册的类的所有方法进行扫描来完成注册;
2、otto采用了注解的方式完成注册;
3、共同的地方缓存所有注册并有可用性的检测。同时可以移除注册;
4、注册的共同点都是采用method方法进行一个集成。

事件发布
在发布的地方,其实差异性不大。都采用了遍历当前的注册表,通过key找到当前注册列表,然后发起Dispatch,调用method.inovke(xxx)方法完成通知。

事件发布不一样的地方eventbus采用了四种线程模式

  • PostThread //直接反射调用,在当前的线程直接调用该方法
  • MainThread //通过Handler去发送消息,然后执行
  • BackgroundThread //如果当前非UI线程,则直接调用;如果是UI线程,则将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用(一个接着一个去调用,中间使用了一个布尔型变量handlerActive进行控制)
  • Async //将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用;线程池与BackgroundThread用的是同一个(动态控制并发)

在otto更多使用场景应该就是在主线程中,因为它内部没有异步线程的场景。(也许是它自身的定位不一样,它就是为了解决UI的通信机制。所以出发点就是轻量级)
在代码中主要体现这一特色的地方就是在接口ThreadEnforcer以及内部的实现域ANY和MAIN。在MAIN内部有一个是否是主线程的检查,而ANY不做任何检查的事情。

为何不重复造轮子,这里给出了目前个人看到最详细的 EventBus 源码解析的连接。

##开发者角度使用对比
1、otto中从源码角度看,要在基类中注册事件是一件比较麻烦的事情。而Evenbus就比较友好(有网友反应如果父类中注册了总线,那么子类中必须实现一个onEvent*方法,否则程序就会崩掉。由于时间问题没进行验证这一点);
2、订阅的事件参数问题,eventbus对多参数不会抛出异常。而otto只允许接收一个参数,否则抛出RuntimeException;(其实这一点作为开源项目对代码的质量还是挺重要的)
3、从个人使用的角度来看,个人更加喜欢otto的特性。因为我只会用otto来简化UI的通信,其他的我并不需要;
4、另外,用java注解的方式来显示的标记订阅方法和生产者方法这非常的友好。至少对刚使用的开发者而言,能够清晰看到代码的思路;
5、不过对不同需求的人群来说,eventbus拓展能力和使用场景更加丰富。如果你的项目通信比较多,而且很复杂的时候;
6、eventbus定义必须onEvent开始的方法感觉还是挺别扭;
7、eventbut是不使用注解是因为注解在2.3之前的系统上会变得缓慢(这一点还需要求证一下);
8、在eventbus中有一个比较难受的地方是:在一个订阅者类中如果有两个同参数类型的接收函数,并且都要执行在主线程,那如何命名呢?由于EventBus只根据事件参数类型来判断接收函数,因此会导致两个函数都会被执行。这当然对开发者来说比较难受了,不过github上已经有人提出采用添加tag的方式来做标记扩展(AndroidEventBus
9、使用otto时候,Bus对象只有作为单例共享的时候才足够高效。

##结束语
任何一个框架都有自己的设计初衷,开发者必须明白每个框架的出发点才能更好的运用。也希望每个开发者能从自己项目的角度出发,只有适合自己的才是最好的。过于追求反而会适得其反。

分析源码在于掌握当前项目的整体思路和应用范围的一个调研,个人更倾向于把关键点说出来而非说把所有细节全部描绘清楚。