Kafka操作实践
分区机制和策略建议
为啥要分区?分区策略
轮询策略随机策略(不推荐)Key-ordering 策略 无消息丢失配置
Producer端Broker端Consumer端 消费者组机制
基础定义重平衡(Rebalance)基础定义触发条件问题风险组成员数发生变更导致重平衡优化方法
场景1:未能及时发送心跳,导致 Consumer 被“踢出”Group场景2:Consumer 消费时间过长 分区位移提交机制
【参考】自动提交【参考】手动提交——同步提交【参考】手动提交——异步提交【建议】手动提交——组合异步和手动提交 常见CommitFailedException场景
消息处理超时 多线程Consumer端方案
Kafka操作实践 分区机制和策略建议 为啥要分区?将大批量的数据均匀分布在多个Broker上,实现负载均衡,实现系统的高伸缩性。
衍生出消息顺序的问题:kafka上,要求保证顺序的消息发往同一分区,是保证消息顺序传输的充分非不要条件。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。
// 获取指定topic的分区信息 List无消息丢失配置partitions = cluster.partitionsForTopic(topic); // 根据对key的hash计算,对到指定分区上 return Math.abs(key.hashCode()) % partitions.size();
问:Kafka 到底在什么情况下才能保证消息不丢失?
答:Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
- Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。并妥善处理好Callback反馈失败的场景。设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。设置 retries 为一个较大的值。retries是 Producer 的参数,是 Producer的自动重试机制。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。设置 replication.factor >= 3。这是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。
组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
三个特性:
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些;Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group;Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
Consumer Group 将位移保存在 Broker 端的内部主题中。
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
触发条件Rebalance 的触发条件有 3 个。
- 组成员数发生变更(99%发生概率,可尽量减少)。
比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。订阅主题数发生变更(一般为主动运维操作,不可避免)。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。订阅主题的分区数发生变更(一般为主动运维操作,不可避免)。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成;所有 Consumer 实例共同参与,全部重新分配所有分区。导致所有客户端TCP连接重新创建;Rebalance 实际对Consumer的TPS影响很大,社区暂时没有方法解决。
建议,设置Consumer端参数:
// 会话超时时间 session.timeout.ms = 6000 // 心跳超时时间 heartbeat.interval.ms = 2000
session.timeout.ms >= 3 * heartbeat.interval.ms 保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求。
场景2:Consumer 消费时间过长消费端业务处理超时时间过长。
慎重考虑在消费消息的同线程中处理业务逻辑,消费端业务处理超时时间设置应考虑到同线程中处理业务逻辑用时。
同时应该避免消费端频繁出现Full GC,导致消费时间被拖长;
参考Consumer端参数:
分区位移提交机制max.poll.interval.ms
kafka位移提交的语义保障是由客户端使用方来负责的,Kafka 只会“无脑”地接受提交的位移。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
Kafka默认为自动提交,默认自动提交周期为5秒,相关Consumer参数:
// 自动提交 enable.auto.commit = true // 自动提交周期设置为5s auto.commit.interval.ms = 5000
Demo代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
try {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
} catch(Exception e) {
// 处理异常
handle(e);
}
}
优点:
自动提交方式下,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
不足:
自动提交位移的一个问题在于,它可能会出现重复消费(发生“重平衡”,未提交的部分可能被重新消费)。
Demo代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
try {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
// 处理消息
process(records);
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// 处理提交失败异常
handle(e);
}
} catch(Exception e) {
// 处理异常
handle(e);
}
}
优点:客户端完全能够把控位移提交的时机和频率;
不足:在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束,会影响整个应用程序的 TPS。
Demo代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
try {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
// 处理消息
process(records);
consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });
} catch(Exception e) {
// 处理异常
handle(e);
}
}
优点:
调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等;
不足:
出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
Demo代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {
try {
while(true) {
ConsumerRecords records =
consumer.poll(Duration.ofSeconds(1));
// 处理消息
process(records);
// 使用异步提交规避阻塞
consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });
}
} catch(Exception e) {
// 处理异常
handle(e);
try {
// 最后一次提交使用同步阻塞式提交
consumer.commitSync();
} finally {
consumer.close();
}
}
}
常见CommitFailedException场景
消息处理超时
Consumer端消息处理超时;
解决方案:
- 缩短单条消息处理的时间——优化处理逻辑;增加 Consumer 端允许下游系统消费一批消息的最大时长——max.poll.interval.ms参数;减少Consumer 端一次性消费的消息总数——max.poll.records,默认500条;下游系统使用多线程来加速消费——很多开源框架玩的方法,例如:Flink。并发场景下,存在唯一提交处理难度。
KafkaConsumer 类不是线程安全的;
——否则ConcurrentModificationException
方案一:
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。
方案二:
消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。
方案比较:
相关内容:
Kafka学习笔记(一)Kafka基础概念理解
Kafka学习笔记(三)Kafka基础设施评估及服务器端配置



