配置KafkaListener进行控制最近在做一个SpringBoot整合Kafka的一个项目,需要控制Kafka客户端消费数据的停止与启动,遇到一个问题,排查下来感觉对自己有一定帮助,趁此记录一下。
1. 配置KafkaListenerContainer工厂,禁止自启动。
@Configuration
public class KafkaConfiguration {
private static Logger logger = LoggerFactory.getLogger(KafkaConfiguration.class);
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactorys() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
}
2. 配置@KafkaListener,带上之前配置的容器工厂类
@KafkaListener(id="test_cep_consumer", topics = "test_cep", errorHandler="consumerAwareErrorHandler", containerFactory = "delayContainerFactorys") public void onMessage(Listevents) throws Exception { logger.info("批量消费一次..."); }
3. 在启动方法里,控制启动消费
@PostConstruct
public void startUp() {
...
consumer.startConsume();
...
}
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startConsume() {
// Start consume Kafka
if (!registry.getListenerContainer("test_cep_consumer").isRunning()) {
registry.getListenerContainer("test_cep_consumer").start();
}
}
实际运行时会发现registry.getListenerContainer("test_cep_consumer")取出来的是null。
排查首先通过查看KafkaListenerEndpointRegistry源码发现,它是使用如下方法进行注册的。
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory> factory) {
registerListenerContainer(endpoint, factory, false);
}
那么它又是在哪里被调用的呢?通过搜索阅读代码发现KafkaListenerEndpointRegistrar这个类进行了真正注册,而这个类实现了Spring的InitializingBean接口,它的执行期即调用afterPropertiesSet()方法比@PostConstruct注解的方法晚,所以在调用@PostConstruct注解的方法时其尚未成功注册Listener进去。
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
}



