说在前面
本文转自“天河聊技术”微信公众号
事件驱动模型设计是一种优雅的程序设计方式,实现有很多,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,还有guava的实现,今天介绍guava eventbus的源码实现,看过这篇文章你自己也可以实现实现一套了。
guava event源码解析
先上一个demo实现,了解车的原理之前先上去感受下
/** * 事件 * weifeng.jiang 2018-06-11 19:06 */public class HelloEvent { private String message; public HelloEvent(String message) { this.message = message; } public String getMessage() { return message; }}
/** * 订阅者 * weifeng.jiang 2018-06-11 19:11 */public class EventListener { @Subscribe public void listen(HelloEvent helloEvent){ System.out.println(helloEvent.getMessage()); }}
/** * 客户端 * weifeng.jiang 2018-06-11 19:12 */public class Main { public static void main(String[] args) {// 创建事件总线 EventBus eventBus = new EventBus("test");// 创建订阅者 EventListener listener = new EventListener();// 注册订阅者 eventBus.register(listener);// 发布事件 eventBus.post(new HelloEvent("asdasd")); eventBus.post(new HelloEvent("asdasdasdas")); eventBus.post(new HelloEvent("asdasdasdasd")); }}
实现原理架构图
怎么成为一个订阅者接受事件呢
接受事件的对象应有一个public方法,用@Subscribe注解标记这个方法,将接受事件对象传递给EventBus实例的register(Object)方法,参考图一和图三
怎么发布事件呢
只需要调用EventBus实例的post方法,参考图三
guava eventbus事件总线有两种,同步的实现EventBus,异步的实现AsyncEventBus,如果订阅者在接收事件后进行长时间的逻辑处理,比如和数据库交互,这时候就需要用异步事件了,如果是简单处理,同步实现就可以。
这里以EventBus事件总线同步实现为例进行源码解析。
成为订阅者的源码实现
和@Subscribe注解配合使用的还有一个@AllowConcurrentEvents注解,这个注解是可以允许事件并发的执行,看下创建订阅者对象的源码实现,如下
/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 为监听器上的方法创建一个订阅服务器。*/static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method);}
可以允许并发事件,在这个类中
@VisibleForTesting static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } @Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } } }}
执行事件的时候是同步实现。
事件总线订阅源码实现
com.google.common.eventbus.SubscriberRegistry#register
void register(Object listener) {// 查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map Multimap, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry , Collection > entry : listenerMethods.asMap().entrySet()) {// 获取事件类型 Class eventType = entry.getKey();// 获取这个事件类型的订阅者集合 Collection eventMethodsInListener = entry.getValue();// 从缓存中按事件类型查找订阅者集合 CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) {// 从缓存中取不到,更新缓存 CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的
private final ConcurrentMap, CopyOnWriteArraySet > subscribers = Maps.newConcurrentMap();
到这里,订阅者已经准备好了,准备接受事件了。
发布事件源码实现
com.google.common.eventbus.EventBus#post
public void post(Object event) {// 获取事件的订阅者集合 IteratoreventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) {// 转发事件 dispatcher.dispatch(event, eventSubscribers);// 如果不是死亡事件,重新包装成死亡事件重新发布 } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
IteratorgetSubscribers(Object event) {// 获取事件类型类的超类集合 ImmutableSet > eventTypes = flattenHierarchy(event.getClass()); List > subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class eventType : eventTypes) {// 获取事件类型的订阅者集合 CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
事件转发器有三种实现
第一种是立即转发,实时性比较高,其他两种都是队列实现。
执行订阅方法都是异步实现
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } });}
说到最后
本次源码解析到这里,仅供参考。