栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot整合Kafka控制消费启停遇到的问题记录(@KafkaListener注解使用)

SpringBoot整合Kafka控制消费启停遇到的问题记录(@KafkaListener注解使用)

最近在做一个SpringBoot整合Kafka的一个项目,需要控制Kafka客户端消费数据的停止与启动,遇到一个问题,排查下来感觉对自己有一定帮助,趁此记录一下。

配置KafkaListener进行控制

1. 配置KafkaListenerContainer工厂,禁止自启动。

@Configuration
public class KafkaConfiguration {

  private static Logger logger = LoggerFactory.getLogger(KafkaConfiguration.class);

  @Autowired
  private ConsumerFactory consumerFactory;


  // 监听器容器工厂(设置禁止KafkaListener自启动)
  @Bean
  public ConcurrentKafkaListenerContainerFactory delayContainerFactorys() {
    ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
    container.setConsumerFactory(consumerFactory);
    //禁止KafkaListener自启动
    container.setAutoStartup(false);
    return container;
  }
}

2. 配置@KafkaListener,带上之前配置的容器工厂类

@KafkaListener(id="test_cep_consumer", topics = "test_cep", errorHandler="consumerAwareErrorHandler", containerFactory = "delayContainerFactorys")
public void onMessage(List events) throws Exception {
  logger.info("批量消费一次...");
}

3.  在启动方法里,控制启动消费

@PostConstruct
public void startUp() {

    ...
    consumer.startConsume();
    ...
}


@Autowired
private KafkaListenerEndpointRegistry registry;

public void startConsume() {
  // Start consume Kafka
  if (!registry.getListenerContainer("test_cep_consumer").isRunning()) {
    registry.getListenerContainer("test_cep_consumer").start();
  }
}

实际运行时会发现registry.getListenerContainer("test_cep_consumer")取出来的是null。

排查

首先通过查看KafkaListenerEndpointRegistry源码发现,它是使用如下方法进行注册的。

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) {
  registerListenerContainer(endpoint, factory, false);
}

那么它又是在哪里被调用的呢?通过搜索阅读代码发现KafkaListenerEndpointRegistrar这个类进行了真正注册,而这个类实现了Spring的InitializingBean接口,它的执行期即调用afterPropertiesSet()方法比@PostConstruct注解的方法晚,所以在调用@PostConstruct注解的方法时其尚未成功注册Listener进去。

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
  @Override
  public void afterPropertiesSet() {
    registerAllEndpoints();
  }

  protected void registerAllEndpoints() {
	synchronized (this.endpointDescriptors) {
	  for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
		this.endpointRegistry.registerListenerContainer(
			descriptor.endpoint, resolveContainerFactory(descriptor));
	  }
	this.startImmediately = true;  // trigger immediate startup
    }
  }
}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/672256.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号