一、kafka高级特性
1.1 producer发布消息机制1.2 Controller及partition选举机制
1.2.1 Controller选举机制(zk控制)1.2.2 partition选举Leader机制 1.3 Rebalance再平衡机制
1.3.1 rebalance的前提1.3.2 什么情况下会发生rebalance1.3.3 Rebalance的过程1.3.4 Rebalance分配策略 1.4 日志存储1.5 分区数如何选择 二、kafka常见问题及调优
2.1 kafka启动慢及消费者重新订阅慢
2.1.1 kafka重启慢问题2.1.2 kafka消费者重启后重新订阅很慢 2.2 kafka消息顺序性分析2.3 kafka如何保证不丢消息2.4 kafka高性能的原因 三、kafka集群的CAP问题
3.1 一致性
3.1.1 数据写一致性控制3.1.2 数据读一致性控制 3.2 **可用性**3.3 分区扩展性
一、kafka高级特性 1.1 producer发布消息机制消息写入方式
producer采用push模式将消息发送到broker,消息将被顺序写入对应的partition中,因为顺序写的极高,保证了kafka的吞吐效率。消息路由机制
producer发消息到时,因为在多分区中只能写入唯一分区,所以消息写入分区号是需要计算的。
指定分区号,则直接使用该分区
public ProducerRecord(String topic, Integer partition, K key, V value)未指定分区,指定key[hash取模指定分区]
public ProducerRecord(String topic, K key, V value) //对当前key进行hash,取模分区数 Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;分区号及key均未指定[RoundRobinPartitioner 轮询发送指定分区]
public ProducerRecord(String topic, V value) //轮询RoundRobin int nextValue = nextValue(topic); List消息写入过程availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; }
1.2 Controller及partition选举机制 1.2.1 Controller选举机制(zk控制)
kafka集群启动时,所有broker向zookeeper发送创建**/Controller临时节点的请求,由zookeeper来保证只有1个broker节点能创建成功,该broker则当选为kafka集群中的Controller,负责管理集群分区和副本状态信息。
当controller节点的broker宕机,则zookeeper中的/Controller**临时节点消失,所有存活的broker将再次竞争创建临时节点,由zookeeper保证唯一的新broker成为Controller。
具备控制器身份的broker需要比其他普通的broker多一份职责,负责监听整个集群所有分区和副本的状态。具体细节如下:
监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。
监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic
所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
更新集群的元数据信息,同步到其他普通的broker节点中。
1.2.2 partition选举Leader机制unclean.leader.election.enable=false 不选非Isr集合中的broker
选择Isr集合中,第一个broker作为leader。(第一个broker最先放进ISR列表,可能是同步数据最多的副本)unclean.leader.election.enable=true 如果Isr列表为空,将选择AR中可用的broker作为partition的lerder
副本进入ISR列表有两个条件:
副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由
replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
1.3 Rebalance再平衡机制
1.3.1 rebalance的前提
消费者通过subscribe不指定分区消费的情况下会发生rebalance,assign这种指定分区消费的方式则不会再平衡。
1.3.2 什么情况下会发生rebalance消费者组的consumer数量发生增减(重启某台消费者服务器时势必也会造成rebalance)动态给topic增加分区消费者组订阅了更多的topic 1.3.3 Rebalance的过程
第一阶段:选择组协调器(选小组长)第二阶段:加入消费组Join Group第三阶段:Sync Group 1.3.4 Rebalance分配策略
rebalance共有3中再平衡策略:range、round-robin、sticky
range(范围分配) 假设有10个分区,消费者组内有3个消费者,则:
消费者0被分配:partition-0、partition-1、partition-2、partition-3
消费者1被分配:partition-4、partition-5、partition-6
消费者2被分配:partition-7、partition-8、partition-9
round-robin(轮询分配) 假设有9个分区,消费者组内有3个消费者,则:
消费者0被分配:partition-0、partition-3、partition-6
消费者1被分配:partition-1、partition-4、partition-7
消费者2被分配:partition-2、partition-5、partition-8
sticky(稳态分配)
sticky与round-robin的分配方案比较类似,但显著的区别时,sticky尽可能保证本次分配与上次相同。
即原有分配方案,尽可能不变。
sticky发生rebalance的两个原则:
1)分区分配尽可能均匀2)再平衡尽可能与上次保持相同 1.4 日志存储
kafka消息存储在指定的log.dir目录下,topic对应的日志信息以Topic+分区号命名。
为避免日志文件过大,kafka为.log文件指定分段大小,以避免文件过大影响读写效率
单个日志分段大小
log.segment.bytes: The maximum size of a single log file
Default: 1073741824 = 1GB
存储文件消息内容的文件.log文件
为提高查询效率,.log文件的内容以每次4K(可配置)的offset作为索引值记录到.index文件中。存储索引offset的文件,可以快速定位消息的offset进而查找到.log真是消息内容的位置。
.timeindex索引文件,同样在.log文件中每次以4K发送一次来到.timeindex中。
1.5 分区数如何选择bin/kafka-producer-perf-test.sh --topic my_local_topic --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.server=192.168.149.128:9092 acks=1二、kafka常见问题及调优 2.1 kafka启动慢及消费者重新订阅慢 2.1.1 kafka重启慢问题
kafka-server-start.sh 修改默认初始化堆内存及最大堆内存
#原值
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
#根据物理内存情况32G内存可置为16G
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:metaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1He apRegionSize=16M"
#最大STW50ms,使用G1垃圾回收器,单元格大小16M,元空间256M,初始化内存16G.最大内存16G,年轻代10G
修改启动实例或关闭实例时梳理日志文件线程数
在启动kafka时,需要后台线程将log.dir目录中topic的分区segment log读取到内核空间,当partition分区中的数据很多(或许日志删除hour过大,保留日志量很大),后台默认的num.recovery.threads.per.data=1将忙不过来,可以试着将该线程数放大
2.1.2 kafka消费者重启后重新订阅很慢
当有消费者宕机或重启,这是生产中非常常见的状态,将导致coordinator认为分区与消费者的对应关系发生变化,需要重新平衡消费者于分区关系。
session.timeout.ms 根据服务需求,可大可小,当该值更小时,更容易触发rebalance,当该值更大时,可能消费者已宕机而broker未发现。max.poll.interval.ms 如果消费者处理消息的时间,超过该设置值,将视为消费者处理能力差,或被踢出ConsumerGrouprebalance策略,可以选择sticky以减少已有稳定分区的重新分配。 2.2 kafka消息顺序性分析 2.3 kafka如何保证不丢消息 2.4 kafka高性能的原因
磁盘顺序读写
kafka消息不能修改及不会在文件中间删除内容,保证了每次写内容都是在文件末尾的追加,从而保证写文件的顺序行;每次读内容时有.index文件的索引查找,及.log文件的分段,又能极大结合磁道读取内容的顺序行,从而提高读取效率。
数据传输零拷贝
读写数据的批量batch处理及压缩传输
三、kafka集群的CAP问题什么是CAP,当分布式系统同时涉及读写时,需要保证一致性(Consistence)、可用性(Availabiliy)、分区容错性(Partition Tolerance)的平衡。成熟的分布式系统一般是在一致性、可用性方面找到平衡点,依据服务的需求设计出更贴近使用的CAP系统。
3.1 一致性当强调一致性时,需保证所有的ISR必须完成数据的同步才算完成生产者数据的写入,而此时从可用性来看,只要Leader存活,是可以保证可用性的。除非Leader宕机,Follower才需要从ISR中靠前的broker选择接替Leader,短时间出现不可用。
当强调可用性时,也可以通过保证最终一致性(如ack=-1,>2各副本完成数据同步),使集群其他节点最终数据一致性,从而实现可用性和一致性得以兼得。
分布式系统一般存在多个副本,Leader副本和多个Follower副本,如果系统能够保证线性一致性当然极好,但如果不能保证线性一致,进而保证顺序一致性也能满足需要。
什么是线性一致性?
线性一致性(Linearizability),也称原子一致性(atomic consistency)、强一致性(strong consistency)。从字面意思就可以理解,多个副本之间的同步状态像是集群只有1个副本。所有的操作都是原子的,当针对leader进行更新时,多个follower副本的状态也需要同步更新完成才能让该条更新结果对客户端可见。
显而易见,线性一致性对消费者客户端是十分友好的,我和朋友小张分别拿着手机看NBA总决赛最后10s,当小张看到Ray Allen三分绝杀比赛结束时,我看到的却还是比分持平,等待发球,我的内心是无法接收的。
但做到线性一致性是需要付出更多代价的,比如在kafka中消息生产者可以将 ACKS=ALL或-1,同时 min.insync.replicas(最少同步副本数量)设置为AR(所有副本数),此时生产者发送消息必须所有副本写入完成才算成功。但这毫无疑问将导致集群可用性的降低。什么是顺序一致性(Sequence Consistency)
假设分布式系统中,X的原值为0,ClientA对X的值修改为1,那么因为网络的延迟等原因,虽然B0、C1的执行事件与A0存在先后顺序,但在服务端真正执行的顺序却是不确定的。所以B0、C1由于与A0存在并行关系,所以B0、C1得到X=0或X=1都是可能的。
顺序一致性,需要保证的就是:当一个Client从服务端得到最新的结果后,后续其他Client得到的结果也必须是最新的。既:当B0如果读到X=1,那么C1也只能得到X=1。这样就保证了有序的一致性。
我们从kafka的角度再详细的看读写一致性问题。
3.1.1 数据写一致性控制
更新的数据只能由Leader写入,然后同步超过半数以上副本算写入成功,其他未立即同步的副本会在后续完成写入,可得数据写入是线性一致、强一致的。
Acks=-1 或Acks=All min.insync.replicas >= num/2+num%2 (保证同步副本超过半数)3.1.2 数据读一致性控制
kafka官网中介绍,Consumer客户端连接不同的Partition分区,虽然有了HW高水位的存在保证消费者能读取到的消息水位是相同的,但由于不同分区的log-end-offset偏移量是不同的(显然Leader的最大偏移量是最大的),当有2各客户端分别读取Follower1和Follower2,此时X=3尚未同步到Follower2。如果此时
疑问:
1、kafka能否保证消息不丢? 不能 why?
2、集群部分节点挂了怎么处理的?ISR
3、能否整包所有消息有序?不能,只能保证分区内有序
4、是否支持消息幂等?producer可调整配置支持幂等,消费者端需要业务自行实现
一致性、高可用、可扩展性
(一致性)消息丢失及处理(消息保障)
(一致性)消费确认机制等



