栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka学习笔记(二)Kafka操作实践

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka学习笔记(二)Kafka操作实践

Kafka学习笔记(二)Kafka操作实践

Kafka操作实践

分区机制和策略建议

为啥要分区?分区策略

轮询策略随机策略(不推荐)Key-ordering 策略 无消息丢失配置

Producer端Broker端Consumer端 消费者组机制

基础定义重平衡(Rebalance)基础定义触发条件问题风险组成员数发生变更导致重平衡优化方法

场景1:未能及时发送心跳,导致 Consumer 被“踢出”Group场景2:Consumer 消费时间过长 分区位移提交机制

【参考】自动提交【参考】手动提交——同步提交【参考】手动提交——异步提交【建议】手动提交——组合异步和手动提交 常见CommitFailedException场景

消息处理超时 多线程Consumer端方案

Kafka操作实践 分区机制和策略建议 为啥要分区?

将大批量的数据均匀分布在多个Broker上,实现负载均衡,实现系统的高伸缩性。
衍生出消息顺序的问题:kafka上,要求保证顺序的消息发往同一分区,是保证消息顺序传输的充分非不要条件。

分区策略 轮询策略

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

随机策略(不推荐)

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。

Key-ordering 策略
// 获取指定topic的分区信息
List partitions = cluster.partitionsForTopic(topic);
// 根据对key的hash计算,对到指定分区上
return Math.abs(key.hashCode()) % partitions.size();
无消息丢失配置

问:Kafka 到底在什么情况下才能保证消息不丢失?
答:Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

Producer端
    Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。并妥善处理好Callback反馈失败的场景。设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。设置 retries 为一个较大的值。retries是 Producer 的参数,是 Producer的自动重试机制。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
Broker端
    设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。设置 replication.factor >= 3。这是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
Consumer端
    确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
消费者组机制 基础定义

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。
组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
三个特性:

    Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些;Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group;Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
    理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
    Consumer Group 将位移保存在 Broker 端的内部主题中。
重平衡(Rebalance) 基础定义

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

触发条件

Rebalance 的触发条件有 3 个。

    组成员数发生变更(99%发生概率,可尽量减少)。
    比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。订阅主题数发生变更(一般为主动运维操作,不可避免)。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。订阅主题的分区数发生变更(一般为主动运维操作,不可避免)。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
问题风险
    在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成;所有 Consumer 实例共同参与,全部重新分配所有分区。导致所有客户端TCP连接重新创建;Rebalance 实际对Consumer的TPS影响很大,社区暂时没有方法解决。
组成员数发生变更导致重平衡优化方法 场景1:未能及时发送心跳,导致 Consumer 被“踢出”Group

建议,设置Consumer端参数:

// 会话超时时间
session.timeout.ms = 6000
// 心跳超时时间
heartbeat.interval.ms = 2000

session.timeout.ms >= 3 * heartbeat.interval.ms 保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求。

场景2:Consumer 消费时间过长

消费端业务处理超时时间过长。
慎重考虑在消费消息的同线程中处理业务逻辑,消费端业务处理超时时间设置应考虑到同线程中处理业务逻辑用时。
同时应该避免消费端频繁出现Full GC,导致消费时间被拖长;
参考Consumer端参数:

max.poll.interval.ms

分区位移提交机制

kafka位移提交的语义保障是由客户端使用方来负责的,Kafka 只会“无脑”地接受提交的位移。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

【参考】自动提交

Kafka默认为自动提交,默认自动提交周期为5秒,相关Consumer参数:

// 自动提交
enable.auto.commit = true
// 自动提交周期设置为5s
auto.commit.interval.ms = 5000

Demo代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    try {
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    } catch(Exception e) {
        // 处理异常
        handle(e);
    }
}

优点:
自动提交方式下,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
不足:
自动提交位移的一个问题在于,它可能会出现重复消费(发生“重平衡”,未提交的部分可能被重新消费)。

【参考】手动提交——同步提交

Demo代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    try {
        ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
        // 处理消息
        process(records);
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            // 处理提交失败异常
            handle(e); 
        }
    } catch(Exception e) {
        // 处理异常
        handle(e);
    }
}

优点:客户端完全能够把控位移提交的时机和频率;
不足:在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束,会影响整个应用程序的 TPS。

【参考】手动提交——异步提交

Demo代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    try {
        ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
        // 处理消息
        process(records);
        consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });
    } catch(Exception e) {
        // 处理异常
        handle(e);
    }   
}

优点:
调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等;
不足:
出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

【建议】手动提交——组合异步和手动提交

Demo代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {
    try {
        while(true) {
            ConsumerRecords records = 
                    consumer.poll(Duration.ofSeconds(1));
            // 处理消息
            process(records);
            // 使用异步提交规避阻塞
            consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });
        }
    } catch(Exception e) {
        // 处理异常
        handle(e);
        try {
            // 最后一次提交使用同步阻塞式提交
            consumer.commitSync();
        } finally {
            consumer.close();
        }
    }
}
常见CommitFailedException场景 消息处理超时

Consumer端消息处理超时;
解决方案:

    缩短单条消息处理的时间——优化处理逻辑;增加 Consumer 端允许下游系统消费一批消息的最大时长——max.poll.interval.ms参数;减少Consumer 端一次性消费的消息总数——max.poll.records,默认500条;下游系统使用多线程来加速消费——很多开源框架玩的方法,例如:Flink。并发场景下,存在唯一提交处理难度。
多线程Consumer端方案

KafkaConsumer 类不是线程安全的;
——否则ConcurrentModificationException
方案一:
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。


方案二:
消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

方案比较:

相关内容:

Kafka学习笔记(一)Kafka基础概念理解

Kafka学习笔记(三)Kafka基础设施评估及服务器端配置

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735713.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号