栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

4.Kafka Broker 总体工作流程和文件存储

4.Kafka Broker 总体工作流程和文件存储

Kafka Broker 总体工作流程和文件存储

1.broker启动后在zk上注册2.controller谁先注册,由哪个controller决定leader选举3.由选举出来的Controller监听brokers节点变化4.Controller决定leader选举5.Controller将节点的信息上传到zk6.其他Controller从zk上同步相关信息7.如果Broker中的leader挂了8.Controller监听到变化9.获取zk上的ISR(存活的followers)10.选举出新的leader,在ISR中存活且在AR中拍在前面的优先

具体流程如下图:

Kafka服役新节点

准备hadoop105机器,修改server.properties的broker.id 为 3单独启动 hadoop105 中的 kafka

bin/kafka-server-start.sh -daemon ./config/server.properties

创建一个要均衡的主题

 vim topics-to-move.json
 #输入下面的内容
 {
 	"topics": [
 		{"topic": "first"}
 	],
 	"version": 1
}

生成一个负载均衡的计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

验证

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
退役旧节点

执行负载均衡操作

 vim topics-to-move.json
 #输入下面的内容
 {
 	"topics": [
 		{"topic": "first"}
 	],
 	"version": 1
}

生成一个负载均衡的计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

创建副本存储计划

vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,2,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

验证 Kafka 副本

Kafka 副本作用:提高数据可靠性。Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

AR = ISR + OSRISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。 kafka Leader 和 Follower 故障处理细节

follower故障

Follower发生故障后会被临时踢出ISR这个期间Leader和其他Follower继续接收数据待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。

HW(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。HW(High Watermark):所有副本中最小的LEO 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

Leader故障处理细节

Leader发生故障之后,会从ISR中选出一个新的Leader为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

手动调整分区副本存储

​ 在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副

本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

调整步骤:

创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

vim increase-replication-factor.json

#输入以下内容
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}] 
}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

查看是否更改成功

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

# 更改前
Topic: three    TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: three    Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: three    Partition: 3    Leader: 2       Replicas: 2,0   Isr: 2,0

# 更改后
Topic: three    TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 1    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0,1
        Topic: three    Partition: 3    Leader: 1       Replicas: 1,0   Isr: 0,1
kafka的文件存储 kafka的文件存储机制

​ Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment(每个segment大小1G)。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

index 文件和 log 文件详解

怎样根据offset找到记录在log文件中的位置

根据目标offset定位到segment文件(segment文件名末尾是offset)在index文件中找到小于等于目标offset的最大offset对应的索引项根据索引项在log文件中找到对应的目标

如下面的例子:找offset=600的record。

首先根据offset=600找到位于segment1中然后在segment1的index文件中,找到比600小的offset587,对应position6410根据position6410在log中找到6410的位置开始往下找到offset=600的记录

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

log.retention.hours,最低优先级小时,默认 7 天log.retention.minutes,分钟。log.retention.ms,毫秒。

优先级:毫秒>分钟>小时(如设置毫秒则设置的分钟将失效)

log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。(默认每5分钟判断数据有没有超时)

超时后,Kafka 中提供的日志清理策略有 delete 和 compact 两种,默认delete

delete 日志删除:将过期数据删除(log.cleanup.policy = delete 所有数据启用删除策略)

基于时间的delete策略。以一个segment中最大的时间作为这个segment的时间(如果一个segment,一部分数据超过7天,一部分数据没超过7天,则不会删除这个segment)基于大小的delete策略,超过设置的日志大小则删除最早的segment。(一般不用)

compact 日志压缩

对于相同key的不同value值,只保留最后一个版本。 kafka读写速度快的原因

Kafka 本身是分布式集群,可以采用分区技术,并行度高读数据采用稀疏索引,可以快速定位要消费的数据顺序写磁盘。记录追加到log文件中

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784966.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号