- 第一种方法:配置一个线程池
- 第二种方法:实现 AsyncConfigurer 接口
关键字:SpringBoot 异步执行方法、Spring 异步执行有返回值的方法
在 Spring 中,我们可以找到 @EnableAsync 注解,通过月的该注解的作用,大致有这么几点信息:
① 该注解的作用是开启 SpringBoot 异步执行方法的能力
② @EnableAsync 注解必须要和 @Configuration 注解一起使用
@Configuration
@EnableAsync
public class AppConfig {
}
③ 如果没有配置线程池而直接使用 @Async 注解,此时会使用默认策略
1. 先在 Context 中根据类型寻找 org.springframework.core.task.TaskExecutor 对象 2. 如果没有找到,则在 Context 中根据类型寻找 java.util.concurrent.Executor 对象且名称为 'taskExecutor' 3. 如果仍然没有找到,则创建 org.springframework.core.task.SimpleAsyncTaskExecutor 用于执行异步方法
④ 异步执行有两种方法
// 第一种 定义一个线程池,并 @EnableAsync 开启异步,在需要异步执行的方法上增加 @Async // 第二种 实现 AsyncConfigurer 接口,在需要异步支持的方法上使用 @Async第一种方法:配置一个线程池
① 配置线程池
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import lombok.Setter;
@Setter
@EnableAsync
@Configuration
@ConfigurationProperties(prefix = "task.pool")
public class ThreadPoolConfig {
private int corePoolSize = 5;
private int maxPoolSize = 10;
private int keepAliveTime = ((int) TimeUnit.SECONDS.toSeconds(30));
private int queueCapacity = 1000;
private static final String THREAD_PREFIX = "thread-call-runner-%d";
@Bean
@Lazy
public ThreadPoolTaskExecutor threadPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(THREAD_PREFIX);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
② 在配置文件中进行配置
task:
pool:
corePoolSize: 5
maxPoolSize: 20
keepAliveSeconds: 300
queueCapacity: 50
③ 测试:向线程池中提交任务
提交多个任务,测试多个线程的累加是否正确
import java.util.concurrent.atomic.LongAdder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class AutomaticIncTask implements Runnable {
private LongAdder adder;
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
adder.increment();
}
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import org.example.task.AutomaticIncTask;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootTest
public class ThreadPoolConfigTest {
@Autowired
private ThreadPoolTaskExecutor executor;
@Test
public void testThreadPoolTaskExecutor() {
LongAdder adder = new LongAdder();
List> futures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
// 注意这里
// 这里提交的是无返回值的任务,有返回值的任务需要实现 Callable 接口
// futures 用于收集线程的监听结果,由于是异步的,主线程是不会等待线程池里面的任务的
futures.add(executor.submitListenable(new AutomaticIncTask(adder)));
}
// 附带阻塞主线程效果,目的是为了让主线程等待线程池中的线程执行完毕
futures.forEach(ListenableFuture::completable);
System.out.println("执行完毕:" + adder.longValue());
}
}
④ 在方法上使用 @Async 开启异步任务
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("asyncThreadPool")
@Override
public void asyncNoReturn() {
new AutomaticIncTask(new LongAdder()).run();
}
@Async("asyncThreadPool")
@Override
public ListenableFuture asyncReturn() throws Exception {
// 这里可以改写成执行多个任务(增加任务的并行执行能力),对任务的返回值进行过滤
// 1. 如果全部没有包含错误信息,则返回结果为 success
// 2. 如果其中一个返回值包含错误信息,则返回结果为 fail
return new AsyncResult<>(new CallAbleTask(new LongAdder()).call());
}
}
第一种:执行无返回值的方法
@GetMapping("/hello")
public void hello() {
service.asyncNoReturn();
}
第二种:执行有返回值的方法
import java.util.concurrent.Callable; import java.util.concurrent.atomic.LongAdder; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class CallAbleTask implements Callable{ private LongAdder adder; @Override public String call() { String result = "success"; try { // 执行成功 for (int i = 0; i < 1000; i++) { adder.increment(); } } catch (Exception e) { // 执行错误 result = "fail"; } return result; } }
注意:有返回值的需要通过 org.springframework.util.concurrent.ListenableFuture 包装,具体请查看 @Async 注解的介绍
@GetMapping("/hi")
public String hi() throws Exception {
ListenableFuture future = service.asyncReturn();
return future.get();
}
第二种方法:实现 AsyncConfigurer 接口
这个方法没有测试,自行测试
① 实现接口
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("thread-call-runner-%d");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}
}
② 在方法上使用 @Async



