总体结构
发送过程
直接与具体broker连接进行消息发送分区器,默认使用DefaultPartitioner:有key默认使用hash,除修改分区数量是,可以保证相同key在一个分区;没有key,通过计数%分区数进行循环发送。业务线程通过KafkaProducer.send()方法不断向RecordAccumulator追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator中的消息。KafkaProduce为线程安全,consumer才是非线程安全可以使用拦截器修改数据
重要参数
zookeeper.max.in.flight.requests 每个broker/node连接的最大等待发送的请求数量,2.8默认10batch.size:批量消息最大数,设置为0代表不支持批量发送,默认16384bootstrap.servers:启动时候连接的servers,但producer实际会通过metadata获取到所有broker 信息并可能创建连接metadata.max.age.ms:元数据最大cache时间,超过即重新请求acks 1:只需要leader写入就认为发送成功;0: 不等待,只能用于可以丢失消息的场景;-1或者all,必须不少于min.insync.replicas的副本写入数据min.insync.replicasmax.request.size:消息最大size,默认1Mretry.backoff.ms 发送失败后等待间隔,默认5秒retries 发送失败重试次数,默认为0;如果要设置大于0,需要考虑好retry.backoff.ms默认5秒是否合适compression.type:消息压缩类型,默认为NONE,采用压缩可以提高吞吐率,但时延加大linger.ms 默认为0,批量消息逗留时长,可以用于降低批量发送请求次数,但会加大延迟receive.buffer.bytes:tcp 接收buffer,-1采用os默认,默认32Ksend.buffer.bytes :tcp 接收buffer,-1采用os默认,默认128Kbuffer.memory 消息缓存区大小,默认32M,但不是hard bound,如压缩等可能操作可能其它内存 Consumer
数据获取
如果发起多次topic订阅,以最后订阅为准订阅指定分区 consumer.assign(Arrays.asList(new TopicPartition(“topic-demo”,0))); 可以通过partitionsFor得到分区所有partition信息暂停/重启消费 pause/resume方法使用拉方式,offset保存在内部compact topic __consumer_offsets.拦截器;Consumer 会在POLL方法返回之前调用拦截器的onConsume方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息,比如降低返回的消息的个数
offset
offset提交;自动提交(默认、不推荐)enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移同步提交-commitSync,默认方法会根据poll()方法拉取的最新位移来进行提交,也可以设置offsets 参数, 用来提交指定分区的位移异步提交commitAsync;需要考虑异常处理,比如失败就使用同步提交使用seek从指定offset开始消费:seekToBeginning、seekToEnd、offsetsForTimesoffsets.retention.minutes 默认7天,消费者offset最长保留时间,如果超过此时长重启消费,offset是不存在的,改为由auto.offset.reset决定
线程安全
consumer为非线程安全类可以使用netty 等使一个consumer固定在对应线程中,实现acquire和release函数在同一线程也可以poll和处理分离,处理使用线程池进行,配合异步提交模式,这种模式需要考虑同一key是否需要顺序处理、offset提交及部分处理失败的弥补机制
分区分配策略
参数:partition.assignment.strategy
org.apache.kafka.clients.consumer.RangeAssignor: The default assignor, which works on a per-topic basis.
org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns partitions to consumers in a round-robin fashion.
推荐-org.apache.kafka.clients.consumer.StickyAssignor:可以实现分区的分配要尽可能的均匀,分区的分配尽可能的与上次分配的保持相同,Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
推荐-org.apache.kafka.clients.consumer.CooperativeStickyAssignor: 在 StickyAssignor 分配策略的基础上,再增加渐进式的重平衡
Follows the same StickyAssignor logic, but allows for cooperative rebalancing. 使用Incremental Cooperative Rebalancing 协议
自定义:实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor interface
参考URL https://juejin.cn/post/6988053050392838151
重要参数
bootstrap.serversgroup.id 归属消费组,kafka没有广播消息一说,只能通过不同消费者实现类似功能key.deserializervalue.deserializerfetch.max.bytes 默认55Mfetch.max.wait.ms 最大等待时长,默认500MSsend.buffer.bytes tcp 接收buffer,-1采用os默认,默认128Kreceive.buffer.bytes tcp 接收buffer,-1采用os默认,默认32Kmetadata.max.age.ms:元数据最大cache时间,超过即重新请求retry.backoff.ms 失败后等待间隔,默认5秒retries 重试次数,默认为0,如果要设置大于0,需要考虑好retry.backoff.ms默认5秒是否合适reconnect.backoff.ms
rebalance
除正常增加consumer和consumer丢失外应该尽量避免rebalance,rebalance带来的问题
1、可能重复消费: Consumer被踢出消费组,可能还没有提交offset,Rebalance时会Partition重新分配其它Consumer,会造成重复消费,虽有幂等操作但耗费消费资源,亦增加集群压力
2、集群不稳定:Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
3、影响消费速度:Rebalance 期间整个group暂停消费,频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
避免措施
避免未能及时发送心跳而Rebalance:heartbeat.interval.ms 心跳时间,一般为超时时间的1/3,Consumer在被判定为死亡之前,能够发送至少 3 轮的心跳请求session.timeout.ms 一次session的连接超时时间避免Consumer消费超时而Rebalance:max.poll.interval.ms 每隔多长时间去拉取消息。合理设置预期值,消费者要尽量间隔时间内处理完业务逻辑并再次pollmax.poll.records 一次从拉取出来的数据条数。根据消费业务处理耗费时长合理设置
避免rebalance等带来重复消费
在ConsumerRebalanceListener重获取offset在处理过程中,保存offset到redis等如果可以重复消费,不必特别处理
过程图
参考:https://my.oschina.net/u/4052033/blog/3142388
主题和分区建议
关闭自动创建topic,设置auto.create.topics.enable=false使用rack:一个partition将横跨 min(racks, replication-factor) 个不同的rack
所有broker要指定归属rack, broker.rack=my-rack-id使用–config options 指定topic的属性值
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1
–replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
参考 https://kafka.apache.org/28/documentation.html#topicconfigs
TOPIC属性
生命周期管理
cleanup.policydelete.retention.msfile.delete.delay.msretention.bytesretention.mssegment.jitter.mssegment.ms:This configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn’t full to ensure that retention can delete or compact old data. 数据格式
compression.typemax.message.bytes,默认1Mmessage.format.version:kafka版本号message.timestamp.type: CreateTimeorLogAppendTime`message.downconversion.enable 磁盘及复制等操作
flush.messages 默认无穷大即不生效flush.ms flush间隔时间follower.replication.throttled.replicasleader.replication.throttled.replicasmax.compaction.lag.msmin.cleanable.dirty.ratio,用于compact清理,默认50%min.compaction.lag.msmin.insync.replicaspreallocate:是否预占磁盘空间segment.bytes:单个数据文件最大sizesegment.index.bytesunclean.leader.election.enable index管理
index.interval.bytes 其它
message.timestamp.difference.max.ms,消息发送时间与broker接收时间最大间隔,默认不检查
效验topic合规性
alter.config.policy.class.name: implement the org.apache.kafka.server.policy.AlterConfigPolicy interface.
create.topic.policy.class.name: implement the org.apache.kafka.server.policy.CreateTopicPolicy interface.
分区管理
kafka创建新topic时候会尽量保证preferred replica/分区leader replica 在broker/rack间的均衡,这个均衡是相对的、比如没有考虑topic繁忙度
leader再均衡
自动均衡-默认打开 auto.leader.rebalance.enable ,默认为true,建议关闭以避免高峰期间启动rebalance,相关参数:
leader.imbalance.per.broker.percentageleader.imbalance.check.interval.seconds
手动均衡 使用kafka-preferred-replica-election.sh;可以使用path-to-json-file参数来小批量地对部分分区执行优先副本的选举操作。
选举过程数据保存在ZooKeeper的/ admin/preferred replica election 节点
分区重分配和修改分区数量
使用kafka-reassign-partitions.sh 在broker上重新分配分区,一般步骤:观察现有分布、生成变更json、执行效验
可以复制过程中在broker或者topic进行限流,相关参数
leader.replication.throttled.replicasfollower.replication.throttled.replicas
可以使用kafka-preferred-replica-election.sh或者kafka-configs.sh进行限制,注意完成后要关闭限流。
如何选择合适分区数量
压力测试:kafka-producer-perf-test.sh,kafka-consumer-perf-test.sh分区数量与性能不是线性增长,在有限硬件下分区数量过多可能导致性能下降分区多,需要打开的文件句柄多



