文章目录问题场景:系统中@Scheduled设置的定时任务每两分钟执行一次,某一天上线了后发现@Scheduled不按时执行,经过排查发现是Spring内部某些内容共用了默认线程池,导致线程池资源不够而抢占资源导致。当时上线的是Kafka内容,引用了spring-kafka,经查发现其使用的线程池和@Scheduled是相同的默认线程池所以出现的问题,Kafka的大量并发持续占满线程池,导致定时任务抢占不到。
- 源码分析过程
- @Async
- @EnableAsync
- @EnbaleAsync的属性annotation
- AsyncConfigurationSelector
- ProxyAsyncConfiguration
- AbstractAsyncConfiguration
- AsyncAnnotationBeanPostProcessor
- AsyncAnnotationAdvisor
- 问题解决方法
下面是对@Async的源码进行分析为例,分析下线程池的创建过程。对源码不感兴趣的,可以跳到最后看解决这个问题的方法代码。 源码分析过程
@Async@EnableAsync 开启spring异步执行器,需要联合@Configuration注解一起使用
@Async 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
*注意:@Async如果A类的A.a()方法调用同类的`@Async A.b()`方法异,会变成同步,因为底层实现是代理对注解扫描实现的,A.a()方法上没有注解,没有生成相应的代理类* @EnableAsync返回值:void或AsyncResult或CompletableFuture
可以自己指定执行器的beanName: @Async(“xxxExecutor”)
直接看源码注释,解释的很清楚,重要的内容有下面这些(其它点在另一篇详细解析中介绍)
- 搭配@Configuration一起使用
- 默认Spring会寻找已经定义的线程池,已定义的org.springframework.core.task.TaskExecutor实例或名称为taskExecutor的实例,如果都没有则会用org.springframework.core.task.SimpleAsyncTaskExecutor。 默认异步的异常只会打印日志
- 想自定义线程池或者异常捕获可以通过实现AsyncConfigurer来实现,如下
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}
}
@EnbaleAsync的属性annotation
定义需要被扫描的注解,默认的Spring的@Async和EJB的@javax.ejb.Asynchronous注解会被扫描,用户可以通过这个属性设置自定义的注解来被扫描为异步类或方法
@import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class extends Annotation> annotation() default Annotation.class;
... ...
}
如果你了解SpringBoot的自动装配原理,就知道看这类源码一眼看到就是@import(AsyncConfigurationSelector.class)
AsyncConfigurationSelector@Override
@Nullable
public String[] selectimports(AdviceMode adviceMode) {
switch (adviceMode) {
// EnableAsync中`AdviceMode mode() default AdviceMode.PROXY;`
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
EnableAsync中AdviceMode mode() default AdviceMode.PROXY;,所以接下来看ProxyAsyncConfiguration这个类
ProxyAsyncConfiguration这个类继承自AbstractAsyncConfiguration,同时创建了一个AsyncAnnotationBeanPostProcessor异步注解后处理器实例
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
// 设置上下文中实现AsyncConfigurer的实例中定义的线程执行器和异步异常处理器
// @link org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
bpp.configure(this.executor, this.exceptionHandler);
// 获取用户自定义扫描注解,如果和默认的(`@Async`和`@javax.ejb.Asynchronous`)不一致采用用户设置注解
Class extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.getNumber("order"));
return bpp;
}
AbstractAsyncConfiguration
AsyncAnnotationBeanPostProcessor设置异步扫描注解元数据,@Autowired设置被实例化的AsyncConfigurer中返回的线程执行器和异步异常处理器
实现了BeanFactoryAware接口,实例创建后会调用实现这个接口的setBeanFactory(BeanFactory beanFactory)覆盖方法。追到最后发现这是用来提供通过beanFactory获取默认Executor的。代码在这里AsyncExecutionAspectSupport.getDefaultExecutor
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(~~beanFactory~~);
this.advisor = advisor;
}
这里面就做了一件事,创建AsyncAnnotationAdvisor异步注解切面实例对象,Let`s see see这个构造方法
AsyncAnnotationAdvisor上面说的默认扫描注解@Async和@javax.ejb.Asynchronous,就是在这里设置的
public AsyncAnnotationAdvisor(@Nullable Supplierexecutor, @Nullable Supplier exceptionHandler) { Set > asyncAnnotationTypes = new linkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous",AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } // 这玩意儿后面会用到,在父类的AbstractAdvisingBeanPostProcessor.postProcessAfterInitialization中 this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
- buildAdvice是设置线程执行器和异常处理器,最终调用的是AsyncExecutionAspectSupport的构造方法和configure方法,上面说的默认线程执行器就是在这里设置的
org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice ->
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure ->
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor - buildPointcut是设置切面规则,就是告知这个Advisor对应哪些注解类型,这里默认就是@Async和@javax.ejb.Asynchronous
了解SpringBoot的实例创建过程,就会知道这个XxxPostProcessor,一般都会伴随着实例初始化后的SpringBoot做的一个后置回调,果然发现其实现了BeanPostProcessor接口,在AbstractAdvisingBeanPostProcessor中复写了postProcessAfterInitialization方法,主要是用来给符合条件的实例创建代理对象
public Object postProcessAfterInitialization(Object bean, String beanName) {
... ...
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
return bean;
}
isEligible用来判断这个类是否和this.advisor关联,其中this.advisor就是上面AsyncAnnotationBeanPostProcessor.setBeanFactory中创建的AsyncAnnotationAdvisor实例,实现方法是AopUtils.canApply(this.advisor, targetClass),AsyncAnnotationAdvisor实现PointcutAdvisor接口
public static boolean canApply(Advisor advisor, Class> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
} else if (advisor instanceof PointcutAdvisor) {
// 这个pca.getPointcut()是AsyncAnnotationAdvisor初始化时设置的切入点,也就是注解类型即默认的@Async和@javax.ejb.Asynchronous
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
} else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}
AOP动态代理如何被拦截的将会有一篇单独的文章介绍,最后被调用的就是执行了AsyncExecutionInterceptor.invoke
调用了父类方法AsyncExecutionAspectSupport#determineAsyncExecutor获取线程执行器,通过看代码得到的是默认的执行器this.defaultExecutor.get()
给@Async 和 @Scheduled创建单独的线程池
@Configuration
@EnableAsync
public class AsyncConfigration implements AsyncConfigurer {
private static final int SCHEDULE_CORE_POOL_SIZE = 30;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(SCHEDULE_CORE_POOL_SIZE);
executor.initialize();
return executor;
}
}
@Configuration
@EnableScheduling
public class ScheduleConfigration implements SchedulingConfigurer {
private static final int SCHEDULE_CORE_POOL_SIZE = 10;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(SCHEDULE_CORE_POOL_SIZE);
taskRegistrar.setScheduler(poolExecutor);
}
}



