栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ThreadPoolTaskExecutor线程池和@Async异步执行

ThreadPoolTaskExecutor线程池和@Async异步执行

需求

        在执行的时候需要同步用户信息,修改用户状态,同步信息的时候需要查询用户之前的记录并同步到ES中

 问题

1、在报名的时候发现因为用户之前的记录比较多,会执行的比较慢,这样会一直报名中

2、用户在多次点击或者网络环境不好的情况下多次请求导致重复计算

解决

除了主线报名之外其他的计算、同步过程都异步执行,或者使用定时任务定时扫描

1、定时任务定时扫描,需要将信息压入队列,然后定期执行,消耗队列使用线程池

 

技术点

使用线程池执行队列:springboot线程池执行类ThreadPoolExecutor

在springboot文档中指出,没有配置线程池的话,springboot会自动配置一个ThreadPoolExecutor到Bean中

1、首先引入线程池配置类

先上代码:

package com.lz.common.config.thread;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


@Component
@ConfigurationProperties("threadpool")
@Data
public class ThreadPoolProperties {


    
    private ThreadPoolInstance poolInstance = new ThreadPoolInstance(){
        {
            this.setCorePoolSize(5);
            this.setMaxNumPoolSize(5);
            this.setKeepAliveSeconds(60);
            this.setWorkQueueCapacity(1000);
        }
    };

    @Data
    public class ThreadPoolInstance{
        
        private Integer corePoolSize;

        
        private Integer maxNumPoolSize;

        
        private Integer workQueueCapacity;

        
        private Integer keepAliveSeconds;
    }



}
1、@Component :标识一个bean
2、@ConfigurationProperties("threadpool"):读取配置文件
3、@Data :lamada

使用这种方式不适应@Value方式是因为1、可以设置默认配置,2、如果配置文件中没有这个配置不会报错,3、可以设置多个默认配置

使用方式
    @Autowired(required = false) //spring在启动的时候会注入bean 但是在扫描的时候没有发现这个bean,强行注入就会失败,使用这个是存在bean就注入
    private ThreadPoolProperties threadPoolProperties;

...
Integer corePoolSize = threadPoolProperties.getPoolInstance().getCorePoolSize()
2、编写基本ThreadPoolExecutor调用类
package com.lz.common.config.thread;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@EnableAsync
@Slf4j
public class TaskExecuterConfiguration {


    @Autowired(required = false) //spring在启动的时候会注入bean 但是在扫描的时候没有发现这个bean,强行注入就会失败,使用这个是存在bean就注入
    private ThreadPoolProperties threadPoolProperties;

    
    @Bean
    @Primary //设置默认
    public ThreadPoolTaskExecutor callRunsExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置线程池参数
        executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
        executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
        executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
        //配置线程池中线程名称前缀
        executor.setThreadNamePrefix("callRunsExcutor-");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化
        executor.initialize();
        return executor;
    }

    
    @Bean
    public ThreadPoolTaskExecutor abortExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置线程池参数
        executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
        executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
        executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
        //配置线程池中线程名称前缀
        executor.setThreadNamePrefix("abortExcutor-");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //初始化
        executor.initialize();
        return executor;
    }

 
     @Bean
     public ThreadPoolTaskExecutor disCardOldsExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         //设置线程池参数
         executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
         executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
         executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
         executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
         //配置线程池中线程名称前缀
         executor.setThreadNamePrefix("disCardOldsExcutor-");
         //设置拒绝策略
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
         //初始化
         executor.initialize();
         return executor;
     }


    @Bean
    public ThreadPoolTaskExecutor discardExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置线程池参数
        executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
        executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
        executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
        //配置线程池中线程名称前缀
        executor.setThreadNamePrefix("discardExcutor-");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //初始化
        executor.initialize();
        return executor;
    }

}
3、调用方式

1

  @Resource(name = "userHoursExecutor")
  private ThreadPoolTaskExecutor userHoursExecutor;

  userHoursExecutor.execute(() -> {
    //执行代码
  });

2、在方法上或者类上执行异步

    @Async("配置中的bean名称")

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/662923.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号