栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Eventbus 3.3.1源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Eventbus 3.3.1源码分析

EventBus

EventBus is a publish/subscribe event bus for Android and Java.

EventBus是适用于Android和Java的发布/订阅事件总线。

优点:

简化组件之间的通信

将事件发送方和接收方解耦在活动、片段和后台线程中表现良好避免复杂且容易出错的依赖关系和生命周期问题 使代码更简单速度快很小(~60k jar)在实践中被安装量超过 1,000,000,000 的应用证明具有高级功能,如交付线程,订阅者优先级等。

1 简单使用 1.1 定义事件
class MessageEvent(val msg:String)
1.2 定义订阅
class HomeActivity : AppCompatActivity() {
    
	override fun onStart() {
         super.onStart()
         EventBus.getDefault().register(this)
 	}

    override fun onStop() {
         super.onStop()
         EventBus.getDefault().unregister(this)
 	}

    @Subscribe(threadMode = ThreadMode.MAIN)
    fun onMessageEvent(messageEvent: MessageEvent){
        val msg = messageEvent.msg
        when(msg){

            "day"->{
 				//...
            }
        }
    }
    
}
1.3 发送事件
 EventBus.getDefault().post(MessageEvent("day"))

2 主要原理 2.1 角色

Event(事件):自定义事件,EventBus会根据事件的类型全局发送

Subscriber(订阅者):主要指需要接收事件的 Activitys 和 Fragments 。在 Activitys 和 Fragments 的生命周期中订阅/注销订阅,接收事件的方法以注解@subscribe进行标示,同时指定线程模型,线程模型默认为POSTING的方式。

Publisher(发布者):事件的发布者。可以在任意线程中发布事件。利用EventBus.getDefault()获取一个全局静态对象EventBus,再通过链式编程调用post()将不同类型的事件发送。

2.2 线程模型

ThreadMode(线程模型)描述事件订阅者和事件发布者所在线程的关系。

ThreadMode主要有5种模式:POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC(异步)

    POSTING

    事件订阅的默认线程。即事件订阅和事件发布在同一个线程。避免了切换线程的开销。对于已知在非常短的时间内完成而不需要主线程的简单任务,这是推荐的模式。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。

    MAIN

    事件处理程序方法在Android 的主线程(UI线程)被调用。使用该模式的订阅器必须快速执行, 避免阻塞主线程

    如果发布线程是主线程,则直接调用事件处理方法(即上方的onMessageEvent()),此时为同步调用,阻塞主线程,与POSTING效果一样

    否则,事件将排队等待传递(非阻塞)。

    MAIN_ORDERED

    事件处理程序方法在Android 的主线程(UI线程)被调用。与MAIN模式不同,事件将始终排队等待传递(非阻塞)。

    使用该模式的订阅器也必须快速执行, 避免阻塞主线程。

MAIN与MAIN_ORDERED线程模式区别:

MAIN :事件处理程序中发布另一个事件,则第二个事件处理程序将在第一个事件处理程序之前完成(因为它是同步调用的)。

MAIN_ORDERED:第一个事件处理程序将完成,然后第二个事件处理程序将在稍后的时间点(一旦主线程具有容量)被调用。

    BACKGROUND

    事件处理程序方法在后台线程被调用。

    发布线程不是主线程,直接在发布线程调用事件处理方法

    发布线程是主线程,EventBus 将使用单个后台线程,该线程将按顺序传递其所有事件。使用此模式的事件处理程序应尝试快速返回,以避免阻塞后台线程。

    ASYNC(异步)

    事件处理程序方法在单独的线程中调用。始终独立于发布线程和主线程。直接在单独的线程调用事件处理程序方法,不等待。

    如果事件处理程序方法的执行可能需要一些时间(例如,用于网络访问),则应使用此模式。避免同时触发大量长时间运行的异步处理程序方法来限制并发线程数。EventBus 使用线程池来有效地重用已完成的异步事件处理程序通知中的线程。

2.3 事件类型

普通事件

普通事件是指已有的订阅者能够收到发送的事件,在事件发送之后注册的事件接收者将无法收到事件。

//发送普通事件
EventBus.getDefault().post(new MessageEvent("day"));

粘性事件

粘性事件是指,不管是在事件发送之前注册的事件接收者还是在事件发送之后注册的事件接收者都能够收到事件。默认为普通事件。

//发送粘性事件
EventBus.getDefault().postSticky(new MessageEvent("Hello everyone!"));

//事件处理
@Subscribe(sticky = true, threadMode = ThreadMode.MAIN)
public void onEvent(MessageEvent event) {   
    textField.setText(event.message);
}

事件优先级

订阅者优先级以影响事件传递顺序。在同一传递线程ThreadMode中,优先级较高的订阅者将在优先级较低的其他订阅者之前接收事件。默认优先级为0。

注意:优先级不影响具有不同ThreadMode的订阅服务器之间的传递顺序

//定义事件接收的优先级
@Subscribe(priority = 1);
public void onEvent(MessageEvent event) {
    ...
}

3 源码解析 1 初始化 getDafault()
public class EventBus {
    static volatile EventBus defaultInstance;

    public static EventBus getDefault() {
        EventBus instance = defaultInstance;
        if (instance == null) {
            synchronized (EventBus.class) {
                instance = EventBus.defaultInstance;
                if (instance == null) {
                    instance = EventBus.defaultInstance = new EventBus();
                }
            }
        }
        return instance;
    }
}

采用单例模式的DCL式创建全局单例对象EventBus

EventBus的构造函数:

//EventBus.java
public class EventBus {
	//EventBuilder对象,初始化其内部属性
    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

    //事件类型缓存
    //key-事件Class对象,value-事件父类型及本类型
    private static final Map, List>> eventTypesCache = new HashMap<>();
    
	//根据不同事件类型存储订阅者: subscriptionsByEventType
    //key-事件Class对象,value-订阅者集合
    private final Map, CopyOnWriteArrayList> subscriptionsByEventType;
    
    //不同订阅者的事件集合: typesBySubscriber
    //key-具体订阅者对象,value-事件Class集合
    private final Map>> typesBySubscriber;
    
    //粘性事件集合: stickyEvents
    //key-事件Class对象,value-具体事件对象
    private final Map, Object> stickyEvents;
    
    //主线程事件发送支持(含判断是否主线程,创建主线程事件发送者)
    private final MainThreadSupport mainThreadSupport;
 
    private final Poster mainThreadPoster;//主线程事件发送者
    private final BackgroundPoster backgroundPoster;//后台线程事件发送者
    private final AsyncPoster asyncPoster;//异步线程事件发送者
    
    //订阅事件查找:根据订阅者Class,找到订阅的事件集合
    private final SubscriberMethodFinder subscriberMethodFinder;
    private final ExecutorService executorService;

	public EventBus() {
        //传入静态EventBuilder对象
        this(DEFAULT_BUILDER);
    }

    EventBus(EventBusBuilder builder) {
        //日志
        logger = builder.getLogger();        
        subscriptionsByEventType = new HashMap<>(); 
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadSupport = builder.getMainThreadSupport();        
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;                
        backgroundPoster = new BackgroundPoster(this);                
        asyncPoster = new AsyncPoster(this);        
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }
    //...
}

2 注册订阅 register(Object subscriber)
//EventBus.java
	public void register(Object subscriber) {
		//1. 反射获得订阅者的Class对象
        Class subscriberClass = subscriber.getClass();
        //2. 通过subscriberMethodFinder找到订阅者所订阅的事件集合
        List subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                //3. 遍历集合进行注册
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    SubscriberMethodFinder.findSubscriberMethods(subscriberClass):根据订阅者Class找到订阅者订阅的事件集合
//SubscriberMethodFinder.java

    private static final Map, List> METHOD_CACHE = new ConcurrentHashMap<>();

	List findSubscriberMethods(Class subscriberClass) {
        //1. 从缓存获取
        List subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
		//2. 获取订阅者自定义的订阅方法集合
        if (ignoreGeneratedIndex) {
            //a. findUsingReflection
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //b. findUsingInfo
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //3. 放入缓存
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

a. findUsingReflection

//SubscriberMethodFinder.java
	private List findUsingReflection(Class subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            //利用反射获取订阅方法及注释属性
            findUsingReflectionInSingleClass(findState);
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

b. findUsingInfo

//SubscriberMethodFinder.java
	private List findUsingInfo(Class subscriberClass) {
        // 1. 从数组中获取FindState对象
        // 如果有直接返回,如果没有创建一个新的FindState对象
        FindState findState = prepareFindState();
        // 2. 根据事件订阅者初始化findState
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 3. 获取subscriberInfo,初始化为null
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 4. 通过反射的方式获取订阅者中的Method
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

	// 从findState中获取订阅者所有方法并释放
    private List getMethodsAndRelease(FindState findState) {
         // 获取订阅者所有订阅方法集合
        List subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        // findState进行回收
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

两个方法最后都会调用findUsingReflectionInSingleClass:利用反射获取订阅方法的注释属性

    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 1. 订阅者中所有声明的方法,放入数组中
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // 2. 获取订阅者中声明的public方法,设置跳过父类
            methods = findState.clazz.getMethods();
          
            findState.skipSuperClasses = true;
        }
        
        for (Method method : methods) {
            // 3. 获取方法的修饰符:public、private等等
            int modifiers = method.getModifiers();
            // 4. 订阅方法为public同时不是abstract、static
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                //方法参数类型数组
                Class[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                     // 5. 获取方法的注解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class eventType = parameterTypes[0];
                        
                        // 6. 将method和eventType放入到findState进行检查
                        if (findState.checkAdd(method, eventType)) {
                            
                            //7. 获取注解中的threadMode对象
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            
                            //8. 新建一个SubscriberMethod对象,同时加入到findState中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            } 
        }
    }
SubscriberMethod
//SubscriberMethod.java
public class SubscriberMethod {
    final Method method;//实际事件处理方法对象
    final ThreadMode threadMode;//线程模式
    final Class eventType;//事件类型
    final int priority;//优先级
    final boolean sticky;//粘性事件
    String methodString;
}
Subscription
final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
}
    订阅者进行注册:subscribe(Object subscriber, SubscriberMethod subscriberMethod)
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //1. 获取事件类型
        Class eventType = subscriberMethod.eventType;
        //2. 将订阅者和订阅方法封装成Subscription对象
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //3. 通过事件类型获取该事件的订阅者集合
        CopyOnWriteArrayList subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            // 如果没有订阅者订阅该事件,创建集合,存入subscriptionsByEventType集合中
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            // 如果有订阅者已经订阅了该事件,判断这些订阅者中是否有重复订阅的现象
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        
        //4. 遍历该事件的所有订阅者
        for (int i = 0; i <= size; i++) {
            // 按照优先级高低进行插入,如果优先级最低,插入到集合尾部
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        //5. 获取该事件订阅者订阅的所有事件集合,将该事件加入到集合中
        List> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        //6. 判断该事件是否是粘性事件
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                //判断事件的继承性,默认是不可继承
                //获取所有粘性事件并遍历,判断继承关系
                Set, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry, Object> entry : entries) {
                    Class candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                         //7. 调用checkPostStickyEventToSubscription方法
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

    private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }

	//根据ThreadMode线程模式不同,来处理事件
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

    void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

	void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //利用反射调用实际的处理事件方法
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
订阅事件步骤
    根据单例设计模式创建一个EventBus对象,同时创建一个EventBus.Builder对象对EventBus进行初始化调用register方法,首先通过反射获取到订阅者的Class对象。通过SubscriberMethodFinder对象获取订阅者中所有订阅的事件集合,它先从缓存中获取;如果缓存中没有,通过反射的方式去遍历订阅者内部被注解的方法,将这些方法放入到集合中进行返回。遍历第三步获取的集合,将订阅者和事件进行绑定。在绑定之后会判断绑定的事件是否是粘性事件,如果是粘性事件,直接调用postToSubscription方法,将之前发送的粘性事件发送给订阅者。

3 发送事件 3.1 post(Object event)
//EventBus.java
	public void post(Object event) {
        // 1、获取当前线程的PostingThreadState
        PostingThreadState postingState = currentPostingThreadState.get();
        // 2、当前线程的事件集合
        List eventQueue = postingState.eventQueue;
        // 3、将要发送的事件加入到集合中
        eventQueue.add(event);

        if (!postingState.isPosting) {
            // 判断是否是主线程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                // 4、只要事件集合中还有事件,就一直发送
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

 

currentPostingThreadState是存储PostingThreadState的ThreadLocal对象

ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据, 并且线程之间的数据是相互独立的。其内部通过创建一个它包裹的泛型对象的数组,不同的线程对应不同的数组索引,每个线程通过get方法获取对应的线程数据。

如果一个变量只要被某个线程独享,可以通过java.lang.ThreadLocal类来实现线程本地存储的功能。每一个线程的Thread对象中都有一个ThreadLocalMap对象,这个对象存储了一组以ThreadLocal.threadLocalHashCode为键,以本地线程变量为值的K-V值对,ThreadLocal对象就是当前线程的ThreadLocalMap的访问入口,每一个ThreadLocal对象都包含了一个独一无二的threadLocalHashCode值,使用这个值就可以在线程K-V值对中找回对应的本地线程变量。

//EventBus.java
    private final ThreadLocal currentPostingThreadState = new ThreadLocal() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

	 // 每个线程中存储的数据
    final static class PostingThreadState {
        final List eventQueue = new ArrayList<>();// 线程的事件队列
        boolean isPosting;//是否正在发送中
        boolean isMainThread;//是否主线程
        Subscription subscription;//事件订阅者和订阅事件的封装
        Object event;//事件对象
        boolean canceled;//是否取消
    }
 

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    //1. 获取事件的Class对象
    Class eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
         //2. 找到当前的event的所有 父类和实现的接口 的class集合
        List> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class clazz = eventTypes.get(h);
            //3. 遍历集合发送单个事件
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

    private static List> lookupAllEventTypes(Class eventClass) {
        synchronized (eventTypesCache) {
            //获取事件集合
            List> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) {
                eventTypes = new ArrayList<>();
                Class clazz = eventClass;
                while (clazz != null) {
                    //添加事件
                    eventTypes.add(clazz);
                    //添加当前事件的接口class
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    // 获取当前事件的父类
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }
	
	//循环添加当前事件的接口class
    static void addInterfaces(List> eventTypes, Class[] interfaces) {
        for (Class interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) {
    CopyOnWriteArrayList subscriptions;
    synchronized (this) {
        //1. 根据事件获取所有订阅它的订阅者
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        //遍历集合
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                //2. 将事件发送给订阅者
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

根据订阅方法的线程模式调用订阅方法

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
发布事件步骤
    获取当前线程的事件集合,将要发送的事件加入到集合中。通过循环,只要事件集合中还有事件,就一直发送。获取事件的Class对象,找到当前的event的所有父类和实现的接口的class集合。遍历这个集合,调用发送单个事件的方法进行发送。根据事件获取所有订阅它的订阅者集合,遍历集合,将事件发送给订阅者。发送给订阅者时,根据订阅方法的线程模式调用订阅方法,如果需要线程切换,则切换线程进行调用;否则,直接调用。
3.2 postSticky(Object event)
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        post(event);
    }

1、将粘性事件加入到EventBus对象的粘性事件集合中,当有新的订阅者进入后,如果该订阅者订阅了该粘性事件,可以直接发送给订阅者。
2、将粘性事件发送给已有的事件订阅者。

4 取消注册 unregister(Object subscriber)
    public synchronized void unregister(Object subscriber) {
        //1. 获取订阅者订阅的所有事件
        List> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
             // 2. 遍历集合
            for (Class eventType : subscribedTypes) {
                // 3. 将该订阅者的从订阅该事件的所有订阅者集合中移除
                unsubscribeByEventType(subscriber, eventType);
            }
            // 4. 将订阅者从集合中移除
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

    private void unsubscribeByEventType(Object subscriber, Class eventType) {
        // 获取该事件的所有订阅者
        List subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                // 将订阅者从集合中移除
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

1、获取订阅者的所有订阅方法,遍历这些方法。然后拿到每个方法对应的所有订阅者集合,将订阅者从集合中移除。
2、移除订阅者中所有的订阅方法。

5 线程切换

根据订阅方法的线程模式调用订阅方法

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
5.1 mainThreadPoster

mainThreadPoster是EventBus进行初始化时创建的

    EventBus(EventBusBuilder builder) {
        mainThreadSupport = builder.getMainThreadSupport();            
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    }
主线程事件发送支持:MainThreadSupport
public interface MainThreadSupport {
    boolean isMainThread();
    Poster createPoster(EventBus eventBus);
}
发送者接口(Poster)
public interface Poster {
    void enqueue(Subscription subscription, Object event);
}
    MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (AndroidComponents.areAvailable()) {
            return AndroidComponents.get().defaultMainThreadSupport;
        } else {
            return null;
        }
    }

通过AndroidComponents.get().defaultMainThreadSupport获得mainThreadPoster,AndroidComponents是一个抽象类,实现类是AndroidComponentsImpl

public abstract class AndroidComponents {

    private static final AndroidComponents implementation;

    static {
        implementation = AndroidDependenciesDetector.isAndroidSDKAvailable()
            ? AndroidDependenciesDetector.instantiateAndroidComponents()
            : null;
    }

    public static boolean areAvailable() {
        return implementation != null;
    }

    public static AndroidComponents get() {
        return implementation;
    }

    public final Logger logger;
    public final MainThreadSupport defaultMainThreadSupport;

    public AndroidComponents(Logger logger, MainThreadSupport defaultMainThreadSupport) {
        this.logger = logger;
        this.defaultMainThreadSupport = defaultMainThreadSupport;
    }
}

AndroidComponentsImpl里面又创建了DefaultAndroidMainThreadSupport

public class AndroidComponentsImpl extends AndroidComponents {

    public AndroidComponentsImpl() {
        super(new AndroidLogger("EventBus"), new DefaultAndroidMainThreadSupport());
    }
}

mainThreadPoster最终其实返回的是HandlerPoster对象

public class DefaultAndroidMainThreadSupport implements MainThreadSupport {

    @Override
    public boolean isMainThread() {
        return Looper.getMainLooper() == Looper.myLooper();
    }

    @Override
    public Poster createPoster(EventBus eventBus) {
        return new HandlerPoster(eventBus, Looper.getMainLooper(), 10);
    }
}
HandlerPoster

HandlerPoster继承Handler并实现了Poster接口

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();//事件队列
    }

    public void enqueue(Subscription subscription, Object event) {
        //将事件封装成PendingPost
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //将事件入队
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //调用sendMessage向主线程发送
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //使用反射的方法调用订阅者的订阅方法
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

5.2 backgroundPoster
public class EventBus {
	private final BackgroundPoster backgroundPoster;
    private final ExecutorService executorService;//线程池

	EventBus(EventBusBuilder builder) {
              
        backgroundPoster = new BackgroundPoster(this);                
		executorService = builder.executorService;
    }
}

public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}

public class Executors {
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
}

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();//事件队列
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                //使用线程池
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

5.3 asyncPoster
class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

asyncPoster原理还是线程池,不过它是在单独的线程中调用

线程切换总结

线程切换主要分析了三个Poster,他们之间的原理区别:

mainThreadPoster:利用Handler发送信息到主线程。backgroundPoster:利用线程池和事件队列实现,EventBus 将使用单个后台线程,该线程将按顺序传递其所有事件。asyncPoster:利用线程池,在单独的线程中回调事件处理

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号