@Configuration
@EnableAsync
public class ThreadAsyncConfigurer implements AsyncConfigurer {
private final static Logger log = LoggerFactory.getLogger(ThreadAsyncConfigurer.class);
@Bean
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPool.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 设置最大线程数
threadPool.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 100);
// 线程池所使用的缓冲队列
threadPool.setQueueCapacity(0);
// 等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteonShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 设置线程名前缀
threadPool.setThreadNamePrefix("AsyncExecutor-");
// 设置拒绝策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
log.error("==========================" + throwable.getMessage() + "=======================", throwable);
log.error("exception method:" + method.getName());
for (Object param : obj) {
log.error("Parameter value - " + param);
}
}
}
}
@Configuration
@EnableAsync
public class ThreadTaskConfig {
@Bean("taskExecutor")
public Executor taskExecutor(){
ThreadPoolTaskExecutor poolExecutor = new ThreadPoolTaskExecutor();
//设置核心线程数 如果是CPU密集型任务,最佳线程数为CPU+1 如果是IO密集型任务,最佳线程数为CPU*2
poolExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()+1);
//设置最大线程数
poolExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 100);
//允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
poolExecutor.setKeepAliveSeconds(60);
//缓冲队列200:用来缓冲执行任务的队列
poolExecutor.setQueueCapacity(200);
// 等待任务在关机时完成--表明等待所有线程执行完
poolExecutor.setWaitForTasksToCompleteonShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待60秒后强制停止
poolExecutor.setAwaitTerminationSeconds(60);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
poolExecutor.setThreadNamePrefix("taskExecutor-");
//拒绝策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;
// 如果执行程序已关闭,则会丢弃该任务
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return poolExecutor;
}
@Bean("myExecutor")
public Executor myExecutor(){
//自定义jdk线程池七大核心参数
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 100,
120L,
//允许线程空闲时间和单位,超过了核心线程出之外的线程在空闲时间到达之后会被销毁
TimeUnit.SECONDS,
//指定一种队列 (有界队列 先进先出)
new ArrayBlockingQueue<>(100),
//默认线程工厂
Executors.defaultThreadFactory(),
//自定义拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
return poolExecutor;
}
}