栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

第2章 Spring 订阅与发布的原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

第2章 Spring 订阅与发布的原理

目录
        • 注册事件广播器
        • 注册事件监听器
          • 加载事件监听器
        • 发布事件
        • 处理事件

在之前了解Spring的订阅与发布是如何使用以后,我们来看Spring是如何实现订阅与发布。Spring的订阅与发布主要分为四个模块:

  • 注册事件广播器 ApplicationEventMulticaster
  • 注册事件监听器 ApplicationEventListener
  • 发布事件
  • 处理监听事件
注册事件广播器

Spring在注册事件广播器时,会调用 AbstractApplicationContext#refresh 方法。这个方法是用来初始化容器的,这个方法会注册Bean对象、初始化Bean对象等等,这里只需要关注 initApplicationEventMulticaster() 方法,这个方法就是用来注册事件广播器的。来看一下这个方法的实现。

protected void initApplicationEventMulticaster() {
        ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
        if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
            this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
            }
        } else {
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);
            }
        }
    }

这个方法两件事:

  • 当前容器已经注册了名为 applicationEventMulticaster 的对象,直接获取这个已经注册好的事件广播器。例如,我们通过配置类去提前配置事件广播器对象。

    @Configuration
    public class AsynchronousSpringEventsConfig {
    
        @Bean(name = "applicationEventMulticaster")
        public ApplicationEventMulticaster applicationEventMulticaster() {
            SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
            multicaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
            return multicaster;
        }
    }
    
  • 如果当前容器没有提前配置,那么当前容器获取不到这个事件广播器对象,需要由Spring自己创建一个 SimpleApplicationEventMulticaster 对象,并把它注册到容器中。SimpleApplicationEventMulticaster 是Spring默认事件广播器。

注册事件监听器 加载事件监听器

Spring是如何加载我们自定义的监听器的呢?自定义的事件监听器是由Spring的 ApplicationListenerDetector 后置处理器的 postProcessAfterInitialization 方法加载的。具体源码如下所示。

class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor {
    
    @Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof ApplicationListener) {
			Boolean flag = this.singletonNames.get(beanName);
			if (Boolean.TRUE.equals(flag)) {
				this.applicationContext.addApplicationListener((ApplicationListener) bean);
			}
			else if (Boolean.FALSE.equals(flag)) {
				this.singletonNames.remove(beanName);
			}
		}
		return bean;
	}
}

从 bean instanceof ApplicationListener 这里,我们就知道为什么自定义事件监听器需要实现 ApplicationListener 接口了。

在来看一下 applicationContext.addApplicationListener 的源码。

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
    
    public void addApplicationListener(ApplicationListener listener) {
        Assert.notNull(listener, "ApplicationListener must not be null");
        if (this.applicationEventMulticaster != null) {
            this.applicationEventMulticaster.addApplicationListener(listener);
        }

        this.applicationListeners.add(listener);
    }
}

到这里,你可能还有一个疑问?后置处理器是在什么时候调用的。我们回到 AbstractApplicationContext#refresh 方法。

public void refresh() throws BeansException, IllegalStateException {
        synchronized(this.startupShutdownMonitor) {
            //省略其他方法
            	this.initApplicationEventMulticaster();
                this.registerListeners();
            // 这里调用后置处理器方法
            	finishBeanFactoryInitialization(beanFactory);
            // 省略其他方法
        }
    }

很显然,后置处理器 ApplicationListenerDetector 的 postProcessAfterInitialization方法是在 finishBeanFactoryInitialization 内调用的。

至于 BeanPostProcessor 是干什么的、如何去使用、实现原理,这里就不展开了,可以详细看看这里:

  • 【Spring探秘|妙用BeanPostProcessor】
  • 【Spring中的BeanPostProcessor】

那 this.registerListeners() 方法有什么用了?这个方法可以用来通过 context#addApplicationListener 来提前注册监听器。具体源码如下所示。

protected void registerListeners() {
	// 注册静态指定的监听器
    for (ApplicationListener listener : getApplicationListeners()) {
        getApplicationEventMulticaster().addApplicationListener(listener);
    }

    // Do not initialize FactoryBeans here: We need to leave all regular beans
    // uninitialized to let post-processors apply to them!
    String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
    for (String listenerBeanName : listenerBeanNames) {
        // 这里只是注册了监听器的BeanName
        getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
    }

    // 提前发布事件
    Set earlyEventsToProcess = this.earlyApplicationEvents;
    this.earlyApplicationEvents = null;
    if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
        for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
            getApplicationEventMulticaster().multicastEvent(earlyEvent);
        }
    }
}
发布事件

发布事件逻辑比较简单,如果事件没有实现 ApplicationEvent 接口,需要把事件单独处理。具体源码如下

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
		Assert.notNull(event, "Event must not be null");

		ApplicationEvent applicationEvent;
    	// 处理实现 ApplicationEvent 接口的对象
		if (event instanceof ApplicationEvent) {
			applicationEvent = (ApplicationEvent) event;
		}
		else {
        // 处理Object类型的对象
			applicationEvent = new PayloadApplicationEvent<>(this, event);
			if (eventType == null) {
				eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
			}
		}

       // 这里留一个疑问,什么时候会有这种场景
		if (this.earlyApplicationEvents != null) {
			this.earlyApplicationEvents.add(applicationEvent);
		}
		else {
            // 把事件交给事件广播处理器
			getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
		}

		// 父容器不为空,向父容器发布事件
        // 这里留一个疑问, 什么时候会有这种场景
		if (this.parent != null) {
			if (this.parent instanceof AbstractApplicationContext) {
				((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
			}
			else {
				this.parent.publishEvent(event);
			}
		}
	}
处理事件

之前说过Spring默认处理事件的同步的方式,如果是要异步执行的话,我们需要单独配置一个线程池。事件广播器处理事件的逻辑:首先获取 executor 线程池,如果没有就同步地执行;如果有,就异步地执行;获取事件 event 所匹配的一组监听器,然后遍历每个监听器,执行事件处理。具体源码如下。

@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    // 获取线程池
    Executor executor = getTaskExecutor();
    // 获取event所匹配的监听器
    for (ApplicationListener listener : getApplicationListeners(event, type)) {
        // 有,异步执行
        if (executor != null) {
            executor.execute(() -> invokeListener(listener, event));
        }
        else {
            // 无,同步执行
            invokeListener(listener, event);
        }
    }
}

你可能会想这里是遍历一组监听器去处理事件,假设其中一个监听器处理事件抛出异常的话,会怎么样。看一下是 invokeListener 方法是怎么处理的。首先,看是否配置 ErrorHandler ,这个是用来处理异常的;如果没有配置的话,异常会直接抛出中断后面监听器处理 (没有开启异步执行事件处理)。

protected void invokeListener(ApplicationListener listener, ApplicationEvent event) {
    ErrorHandler errorHandler = getErrorHandler();
    if (errorHandler != null) {
        try {
            // 真正执行监听器的处理事件的方法
            doInvokeListener(listener, event);
        }
        catch (Throwable err) {
            errorHandler.handleError(err);
        }
    }
    else {
        // 真正执行监听器的处理事件的方法
        doInvokeListener(listener, event);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439587.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号