栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka消费者订阅方式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka消费者订阅方式

Kafka消费者订阅方式
    • 1、指定主题消费
    • 2、指定分区消费
    • 3、取消订阅
    • 4、总结

Kafka为消费者提供了三种类型的订阅消费方式:订阅主题集合、正则表达式订阅主题、订阅指定主题的分区集合。三种方式只能使用其中一种。

1、指定主题消费

一个消费者可以使用KafkaConsumer提供的subscribe()方法订阅一个或多个主题,订阅主题集合和正则表达式订阅主题都使用此方法实现的。下面两种方式都可以订阅topic_1120主题。

KafkaConsumer consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("topic_1120"));
KafkaConsumer consumer = new KafkaConsumer<>(props);

//正则表达式.*代表后续0个或者多个任意字符。
consumer.subscribe(Pattern.compile("topic.*"));

订阅主题在源码中由4个方法重载实现,其中两个带listener的方法是可以自定义Rebalance重平衡的监听类。

@Override
public void subscribe(Collection topics) {
    subscribe(topics, new NoOpConsumerRebalanceListener());
}

@Override
public void subscribe(Collection topics, ConsumerRebalanceListener listener) {
    //省略源码
}

@Override
public void subscribe(Pattern pattern) {
    subscribe(pattern, new NoOpConsumerRebalanceListener());
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
    //省略源码
}
2、指定分区消费

消费者指定分区消费是通过KafkaConsumer提供的assign()方法实现的,assign()方法入参为Collection, 其中TopicPartition有2个属性, topic和partition, 分区从0开始编号。使用assign()方法订阅指定主题test_1120分区0的消息。

/订阅指定分区
consumer.assign(Collections.singleton(new TopicPartition("topic_1120", 0)));
3、取消订阅

取消订阅调用unsubscribe()方法。

consumer.unsubscribe();
4、总结

subscribe()具有自动重平衡的功能,来实现消费负载均衡和故障自动转移,而assign()不具备这种功能。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/591846.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号