栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

@Async @Scheduled的并发问题

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

@Async @Scheduled的并发问题

问题场景:系统中@Scheduled设置的定时任务每两分钟执行一次,某一天上线了后发现@Scheduled不按时执行,经过排查发现是Spring内部某些内容共用了默认线程池,导致线程池资源不够而抢占资源导致。当时上线的是Kafka内容,引用了spring-kafka,经查发现其使用的线程池和@Scheduled是相同的默认线程池所以出现的问题,Kafka的大量并发持续占满线程池,导致定时任务抢占不到。

文章目录
  • 源码分析过程
    • @Async
    • @EnableAsync
      • @EnbaleAsync的属性annotation
      • AsyncConfigurationSelector
      • ProxyAsyncConfiguration
      • AbstractAsyncConfiguration
      • AsyncAnnotationBeanPostProcessor
      • AsyncAnnotationAdvisor
  • 问题解决方法

下面是对@Async的源码进行分析为例,分析下线程池的创建过程。对源码不感兴趣的,可以跳到最后看解决这个问题的方法代码。

源码分析过程

@EnableAsync 开启spring异步执行器,需要联合@Configuration注解一起使用
@Async 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。

@Async

返回值:void或AsyncResult或CompletableFuture
可以自己指定执行器的beanName: @Async(“xxxExecutor”)

*注意:@Async如果A类的A.a()方法调用同类的`@Async A.b()`方法异,会变成同步,因为底层实现是代理对注解扫描实现的,A.a()方法上没有注解,没有生成相应的代理类* @EnableAsync

直接看源码注释,解释的很清楚,重要的内容有下面这些(其它点在另一篇详细解析中介绍)

  1. 搭配@Configuration一起使用
  2. 默认Spring会寻找已经定义的线程池,已定义的org.springframework.core.task.TaskExecutor实例或名称为taskExecutor的实例,如果都没有则会用org.springframework.core.task.SimpleAsyncTaskExecutor。 默认异步的异常只会打印日志
  3. 想自定义线程池或者异常捕获可以通过实现AsyncConfigurer来实现,如下
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }
}
@EnbaleAsync的属性annotation

定义需要被扫描的注解,默认的Spring的@Async和EJB的@javax.ejb.Asynchronous注解会被扫描,用户可以通过这个属性设置自定义的注解来被扫描为异步类或方法

@import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
    Class annotation() default Annotation.class;
    ... ...
}

如果你了解SpringBoot的自动装配原理,就知道看这类源码一眼看到就是@import(AsyncConfigurationSelector.class)

AsyncConfigurationSelector
@Override
@Nullable
public String[] selectimports(AdviceMode adviceMode) {
    switch (adviceMode) {
        // EnableAsync中`AdviceMode mode() default AdviceMode.PROXY;`
        case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }
}

EnableAsync中AdviceMode mode() default AdviceMode.PROXY;,所以接下来看ProxyAsyncConfiguration这个类

ProxyAsyncConfiguration

这个类继承自AbstractAsyncConfiguration,同时创建了一个AsyncAnnotationBeanPostProcessor异步注解后处理器实例

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    
    // 设置上下文中实现AsyncConfigurer的实例中定义的线程执行器和异步异常处理器
    // @link org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
    bpp.configure(this.executor, this.exceptionHandler);
    
    // 获取用户自定义扫描注解,如果和默认的(`@Async`和`@javax.ejb.Asynchronous`)不一致采用用户设置注解
    Class customAsyncAnnotation = this.enableAsync.getClass("annotation");
    if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
        bpp.setAsyncAnnotationType(customAsyncAnnotation);
    }
    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    bpp.setOrder(this.enableAsync.getNumber("order"));
    return bpp;
}
AbstractAsyncConfiguration

设置异步扫描注解元数据,@Autowired设置被实例化的AsyncConfigurer中返回的线程执行器和异步异常处理器

AsyncAnnotationBeanPostProcessor

实现了BeanFactoryAware接口,实例创建后会调用实现这个接口的setBeanFactory(BeanFactory beanFactory)覆盖方法。追到最后发现这是用来提供通过beanFactory获取默认Executor的。代码在这里AsyncExecutionAspectSupport.getDefaultExecutor

@Override
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);

    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(~~beanFactory~~);
    this.advisor = advisor;
}

这里面就做了一件事,创建AsyncAnnotationAdvisor异步注解切面实例对象,Let`s see see这个构造方法

AsyncAnnotationAdvisor

上面说的默认扫描注解@Async和@javax.ejb.Asynchronous,就是在这里设置的

public AsyncAnnotationAdvisor(@Nullable Supplier executor, @Nullable Supplier exceptionHandler) {
    Set> asyncAnnotationTypes = new linkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class)ClassUtils.forName("javax.ejb.Asynchronous",AsyncAnnotationAdvisor.class.getClassLoader()));
    }
    catch (ClassNotFoundException ex) {
        // If EJB 3.1 API not present, simply ignore.
    }
    // 这玩意儿后面会用到,在父类的AbstractAdvisingBeanPostProcessor.postProcessAfterInitialization中
    this.advice = buildAdvice(executor, exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}
  1. buildAdvice是设置线程执行器和异常处理器,最终调用的是AsyncExecutionAspectSupport的构造方法和configure方法,上面说的默认线程执行器就是在这里设置的
    org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice ->
    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure ->
    org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
  2. buildPointcut是设置切面规则,就是告知这个Advisor对应哪些注解类型,这里默认就是@Async和@javax.ejb.Asynchronous

了解SpringBoot的实例创建过程,就会知道这个XxxPostProcessor,一般都会伴随着实例初始化后的SpringBoot做的一个后置回调,果然发现其实现了BeanPostProcessor接口,在AbstractAdvisingBeanPostProcessor中复写了postProcessAfterInitialization方法,主要是用来给符合条件的实例创建代理对象

public Object postProcessAfterInitialization(Object bean, String beanName) {
    ... ...
    if (isEligible(bean, beanName)) {
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);
        return proxyFactory.getProxy(getProxyClassLoader());
    }
    return bean;
}

isEligible用来判断这个类是否和this.advisor关联,其中this.advisor就是上面AsyncAnnotationBeanPostProcessor.setBeanFactory中创建的AsyncAnnotationAdvisor实例,实现方法是AopUtils.canApply(this.advisor, targetClass),AsyncAnnotationAdvisor实现PointcutAdvisor接口

public static boolean canApply(Advisor advisor, Class targetClass, boolean hasIntroductions) {
    if (advisor instanceof IntroductionAdvisor) {
        return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
    } else if (advisor instanceof PointcutAdvisor) {
        // 这个pca.getPointcut()是AsyncAnnotationAdvisor初始化时设置的切入点,也就是注解类型即默认的@Async和@javax.ejb.Asynchronous
        PointcutAdvisor pca = (PointcutAdvisor) advisor;
        return canApply(pca.getPointcut(), targetClass, hasIntroductions);
    } else {
        // It doesn't have a pointcut so we assume it applies.
        return true;
    }
}

AOP动态代理如何被拦截的将会有一篇单独的文章介绍,最后被调用的就是执行了AsyncExecutionInterceptor.invoke
调用了父类方法AsyncExecutionAspectSupport#determineAsyncExecutor获取线程执行器,通过看代码得到的是默认的执行器this.defaultExecutor.get()

问题解决方法

给@Async 和 @Scheduled创建单独的线程池

@Configuration
@EnableAsync
public class AsyncConfigration implements AsyncConfigurer {
    private static final int SCHEDULE_CORE_POOL_SIZE = 30;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(SCHEDULE_CORE_POOL_SIZE);
        executor.initialize();
        return executor;
    }
}
@Configuration
@EnableScheduling
public class ScheduleConfigration implements SchedulingConfigurer {
    private static final int SCHEDULE_CORE_POOL_SIZE = 10;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(SCHEDULE_CORE_POOL_SIZE);
        taskRegistrar.setScheduler(poolExecutor);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/322515.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号