- 1、指定主题消费
- 2、指定分区消费
- 3、取消订阅
- 4、总结
Kafka为消费者提供了三种类型的订阅消费方式:订阅主题集合、正则表达式订阅主题、订阅指定主题的分区集合。三种方式只能使用其中一种。 1、指定主题消费
一个消费者可以使用KafkaConsumer提供的subscribe()方法订阅一个或多个主题,订阅主题集合和正则表达式订阅主题都使用此方法实现的。下面两种方式都可以订阅topic_1120主题。
KafkaConsumerconsumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Collections.singletonList("topic_1120"));
KafkaConsumerconsumer = new KafkaConsumer<>(props); //正则表达式.*代表后续0个或者多个任意字符。 consumer.subscribe(Pattern.compile("topic.*"));
订阅主题在源码中由4个方法重载实现,其中两个带listener的方法是可以自定义Rebalance重平衡的监听类。
@Override public void subscribe(Collection2、指定分区消费topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Collection topics, ConsumerRebalanceListener listener) { //省略源码 } @Override public void subscribe(Pattern pattern) { subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { //省略源码 }
消费者指定分区消费是通过KafkaConsumer提供的assign()方法实现的,assign()方法入参为Collection, 其中TopicPartition有2个属性, topic和partition, 分区从0开始编号。使用assign()方法订阅指定主题test_1120分区0的消息。
/订阅指定分区
consumer.assign(Collections.singleton(new TopicPartition("topic_1120", 0)));
3、取消订阅
取消订阅调用unsubscribe()方法。
consumer.unsubscribe();4、总结
subscribe()具有自动重平衡的功能,来实现消费负载均衡和故障自动转移,而assign()不具备这种功能。



