前言1.注解使用2.yml配置3.获取yml配置信息4.线程池配置5.启动类6.调用方法2种方式
使用注解@Async调用 实践过程中的坑和经验包
ThreadPoolTaskExecutor 和 ThreadPoolExecutorlinkedBlockingQueue 和 ArrayBlockingQueue
前言最近做项目,有许多业务需要处理,放到了kafka中,为了提高消费kafka效率,引入了线程池,不同的业务处理使用不同的线程池。其他暂且不论,直接上配置。
1.注解使用提前说明下使用地方,有个印象,后需还会讲到。
@EnableAsync 这里会配置在2个地方:启动类和线程池配置类@Async 可以写在类上或者方法上@Component 注册类到ioc@ConfigurationProperties 读取yml配置@Autowired@Qualifier 2.yml配置
配置线程池基本信息
# 线程池配置
async-pool:
# 消费kafka使用的线程池
consumer-kafka:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
# 正常采集上传文件线程池
normal-collection:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
# 搬迁失败的文件重新上传线程池
fail-file:
corePoolSize: 2
maxPoolSize: 4
keepAliveSeconds: 120
queueCapacity: 10
3.获取yml配置信息
创建父类抽象类 AbstractExecutorPool
分别创建子类继承父类 ConsumerKafkaPool 、NormalCollectionPool、FailFilePool
@Data
public abstract class AbstractExecutorPool {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
}
@Component
// 读取yml配置,会自动映射
@ConfigurationProperties(prefix = "async-pool.consumer-kafka")
@Data
public class ConsumerKafkaPool extends AbstractExecutorPool {
// 线程前缀
private String threadNamePrefix = "handler consumer kafka executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.normal-collection")
@Data
public class NormalCollectionPool extends AbstractExecutorPool {
private String threadNamePrefix = "handler norma collection executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.fail-file")
@Data
public class FailFilePool extends AbstractExecutorPool {
private String threadNamePrefix = "handler move fail file executor-";
}
4.线程池配置
创建类 ThreadPoolConfig
线程池具体执行步骤,这里不提,说下拒绝策略 ThreadPoolExecutor 类中提供的前4个拒绝策略,也可以自定义策略。
AbortPolicy 默认策略,队列满时抛出异常RejectedExecutionExceptionDiscardOldestPolicy 去除队列中最早的任务,将新任务放入队列DiscardPolicy 直接丢掉任务CallerRunsPolicy 队列满时,主线程执行任务自定义处理策略
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
private final ConsumerKafkaPool consumerKafkaPool;
private final NormalCollectionPool normalCollectionPool;
private final FailFilePool failFilePool;
// 构造器注入,spring4.x以后推荐使用
@Autowired
public ThreadPoolConfig(ConsumerKafkaPool consumerKafkaPool, NormalCollectionPool normalCollectionPool, FailFilePool failFilePool){
this.consumerKafkaPool = consumerKafkaPool;
this.normalCollectionPool = normalCollectionPool;
this.failFilePool = failFilePool;
}
@Bean(name = "asyncExecutorConsumerKafka")
public Executor asyncExecutorConsumerKafka() {
return initExcutor(consumerKafkaPool, consumerKafkaPool.getThreadNamePrefix(), (r, executor) -> {
log.info("队列已满,根据业务自行处理。。。");
});
}
@Bean(name = "asyncExecutorNormalCollection")
public Executor asyncExecutorNormalCollection() {
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return initExcutor(normalCollectionPool, normalCollectionPool.getThreadNamePrefix(), callerRunsPolicy);
}
@Bean(name = "asyncExecutorFailFile")
public Executor asyncExecutorFailFile() {
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return initExcutor(failFilePool, failFilePool.getThreadNamePrefix(), callerRunsPolicy);
}
private Executor initExcutor(AbstractExecutorPool abstractExecutorPool,String threadName, RejectedExecutionHandler rejectedExecutionHandler){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize());
threadPool.setMaxPoolSize(abstractExecutorPool.getMaxPoolSize());
threadPool.setKeepAliveSeconds(abstractExecutorPool.getKeepAliveSeconds());
threadPool.setQueueCapacity(abstractExecutorPool.getQueueCapacity());
threadPool.setThreadNamePrefix(threadName);
threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
return threadPool;
}
}
5.启动类
启动类上 加注解 @EnableAsync ,这里就不贴代码了
6.调用方法2种方式创建接口
public interface TestService {
void asynctest(String value);
}
使用注解@Async
该注解可以使用在方法或者类上,使用在方法上生命该方法可异步执行,在类上,该类所有方法可异步执行。
@Slf4j
@Service
public class TestServiceImpl implements TestService {
@Override
// 使用注解调用,不使用的请去掉
@Async
public void asynctest(String value) {
String threadName = Thread.currentThread().getName();
log.info(">>线程{},正在处理:{}", threadName, value);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(">>线程test{},处理完成:{}", threadName, value);
}
}
调用
@Component
@Slf4j
public class KafkaCustomTest {
@Autowired
private TestService testService ;
@Autowired
@Qualifier("asyncExecutorConsumerKafka")
private Executor asyncExecutorConsumerKafka;
@KafkaListener(topics = { "test" }, autoStartup = "true")
public void normal(ConsumerRecord record, Acknowledgment ack) throws IOException, InterruptedException {
String value = record.value();
// 使用注解调用start
testService.asynctest(value);
// 使用注解调用end
// 不使用使用注解调用start 不适用注解调用请去掉实现类中的@Async
// asyncExecutorConsumerKafka.execute(()->{
// moveFileService.moveFile(moveFile);
// });
// 不使用使用注解调用end
long offset = record.offset();
log.info(">>>该消息offset:{},消息:{},已提交异步处理。", offset, value);
ack.acknowledge();
}
}
实践过程中的坑和经验包
ThreadPoolTaskExecutor 和 ThreadPoolExecutor
顺便提一句 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 区别
ThreadPoolTaskExecutor 是spring提供的ThreadPoolExecutor 是jdk提供的
ThreadPoolExecutor 可以在代码中写成静态的增长调用,
如:
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 20L,
TimeUnit.SECONDS, new ArrayBlockingQueue(1),
new ThreadPoolExecutor.AbortPolicy());
public static ThreadPoolExecutor getPool() {
return pool;
}
但是 ThreadPoolTaskExecutor 写成静态的就会失效,具体区别了解不深。
linkedBlockingQueue 和 ArrayBlockingQueuelinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLockArrayBlockingQueue生产者和消费者使用的是同一把锁
ThreadPoolExecutor 可以自己指定使用那个队列。
ThreadPoolTaskExecutor 使用的linkedBlockingQueue
protected BlockingQueuecreateQueue(int queueCapacity) { if (queueCapacity > 0) { return new linkedBlockingQueue<>(queueCapacity); } else { return new SynchronousQueue<>(); } }
好好学习,天天努力



