博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
guava eventbus源码解析
阅读量:5968 次
发布时间:2019-06-19

本文共 4909 字,大约阅读时间需要 16 分钟。

hot3.png

说在前面

本文转自“天河聊技术”微信公众号

事件驱动模型设计是一种优雅的程序设计方式,实现有很多,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,还有guava的实现,今天介绍guava eventbus的源码实现,看过这篇文章你自己也可以实现实现一套了。

 

guava event源码解析

先上一个demo实现,了解车的原理之前先上去感受下

/** * 事件 * weifeng.jiang 2018-06-11 19:06 */public class HelloEvent {    private String message;    public HelloEvent(String message) {        this.message = message;    }    public String getMessage() {        return message;    }}
/** * 订阅者 * weifeng.jiang 2018-06-11 19:11 */public class EventListener {    @Subscribe    public void listen(HelloEvent helloEvent){        System.out.println(helloEvent.getMessage());    }}
/** * 客户端 * weifeng.jiang 2018-06-11 19:12 */public class Main {    public static void main(String[] args) {//        创建事件总线        EventBus eventBus = new EventBus("test");//        创建订阅者        EventListener listener = new EventListener();//       注册订阅者        eventBus.register(listener);//        发布事件        eventBus.post(new HelloEvent("asdasd"));        eventBus.post(new HelloEvent("asdasdasdas"));        eventBus.post(new HelloEvent("asdasdasdasd"));    }}

实现原理架构图

怎么成为一个订阅者接受事件呢

接受事件的对象应有一个public方法,用@Subscribe注解标记这个方法,将接受事件对象传递给EventBus实例的register(Object)方法,参考图一和图三

 

怎么发布事件呢

只需要调用EventBus实例的post方法,参考图三

 

guava eventbus事件总线有两种,同步的实现EventBus,异步的实现AsyncEventBus,如果订阅者在接收事件后进行长时间的逻辑处理,比如和数据库交互,这时候就需要用异步事件了,如果是简单处理,同步实现就可以。

 

这里以EventBus事件总线同步实现为例进行源码解析。

 

成为订阅者的源码实现

和@Subscribe注解配合使用的还有一个@AllowConcurrentEvents注解,这个注解是可以允许事件并发的执行,看下创建订阅者对象的源码实现,如下

/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 为监听器上的方法创建一个订阅服务器。*/static Subscriber create(EventBus bus, Object listener, Method method) {  return isDeclaredThreadSafe(method)      ? new Subscriber(bus, listener, method)      : new SynchronizedSubscriber(bus, listener, method);}

可以允许并发事件,在这个类中

@VisibleForTesting  static final class SynchronizedSubscriber extends Subscriber {    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {      super(bus, target, method);    }    @Override    void invokeSubscriberMethod(Object event) throws InvocationTargetException {      synchronized (this) {        super.invokeSubscriberMethod(event);      }    }  }}

执行事件的时候是同步实现。

 

事件总线订阅源码实现

com.google.common.eventbus.SubscriberRegistry#register

void register(Object listener) {//    查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map    Multimap
, Subscriber> listenerMethods = findAllSubscribers(listener);    for (Entry
, Collection
> entry : listenerMethods.asMap().entrySet()) {//      获取事件类型      Class
eventType = entry.getKey();//      获取这个事件类型的订阅者集合      Collection
eventMethodsInListener = entry.getValue();//      从缓存中按事件类型查找订阅者集合      CopyOnWriteArraySet
eventSubscribers = subscribers.get(eventType);      if (eventSubscribers == null) {//        从缓存中取不到,更新缓存        CopyOnWriteArraySet
newSet = new CopyOnWriteArraySet<>();        eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);      } eventSubscribers.addAll(eventMethodsInListener);    } }

事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的

private final ConcurrentMap
, CopyOnWriteArraySet
> subscribers = Maps.newConcurrentMap();

到这里,订阅者已经准备好了,准备接受事件了。

 

发布事件源码实现

com.google.common.eventbus.EventBus#post

public void post(Object event) {//    获取事件的订阅者集合    Iterator
eventSubscribers = subscribers.getSubscribers(event);    if (eventSubscribers.hasNext()) {//      转发事件      dispatcher.dispatch(event, eventSubscribers);//      如果不是死亡事件,重新包装成死亡事件重新发布    } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent      post(new DeadEvent(this, event));    } }
Iterator
getSubscribers(Object event) {//    获取事件类型类的超类集合    ImmutableSet
> eventTypes = flattenHierarchy(event.getClass());    List
> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());    for (Class
eventType : eventTypes) {//      获取事件类型的订阅者集合      CopyOnWriteArraySet
eventSubscribers = subscribers.get(eventType);      if (eventSubscribers != null) { // eager no-copy snapshot        subscriberIterators.add(eventSubscribers.iterator());      } } return Iterators.concat(subscriberIterators.iterator());  }

事件转发器有三种实现

第一种是立即转发,实时性比较高,其他两种都是队列实现。

执行订阅方法都是异步实现

final void dispatchEvent(final Object event) {  executor.execute(      new Runnable() {        @Override        public void run() {          try {            invokeSubscriberMethod(event);          } catch (InvocationTargetException e) {            bus.handleSubscriberException(e.getCause(), context(event));          }        }      });}

 

说到最后

本次源码解析到这里,仅供参考。

 

转载于:https://my.oschina.net/u/3775437/blog/1841898

你可能感兴趣的文章
Linux下eclipse编译C/C++程序遇到 undefined reference to `pthread_create'的异常解决办法
查看>>
ajax上传图片的本质
查看>>
转]最长递增子序列问题的求解
查看>>
SilverLight:基础控件使用(6)-Slider控件
查看>>
Android写的一个设置图片查看器,可以调整透明度
查看>>
第 5 章 File Share
查看>>
判断字符串解析是JsonObject或者JsonArray
查看>>
[LeetCode] Implement strStr()
查看>>
多模块Struts应用程序的几个问题(及部分解决方法)
查看>>
1.2. MariaDB
查看>>
SpringSide示例之HelloWorld
查看>>
LINQ-to-SQL那点事~LINQ-to-SQL中的并发冲突与应对
查看>>
日志不说谎--Asp.net的生命周期
查看>>
C#~异步编程续~.net4.5主推的await&async应用
查看>>
C#进行MapX二次开发之图层操作
查看>>
ASP.NET 运行机制详解
查看>>
C++ little errors , Big problem
查看>>
iOS - Phone 电话
查看>>
根据点提取栅格值
查看>>
在 ML2 中配置 OVS vlan network - 每天5分钟玩转 OpenStack(136)
查看>>