栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka进阶

Kafka进阶

一、操作系统

先说结论:建议部署在 Linux 上。


由于以下三个原因

I/O 模型

数据网络传输效率

Kafka的社区支持度

① I/O 模型层面

I/O 模型是操作系统执行 IO 指令的方法。

分别有五种类型

    阻塞式 IO

    非阻塞式 IO

    IO 多路复用

    信号驱动 IO

    异步 IO

可以简单认为后面的模型比前面的模型要更高效,epoll 模型介于第三种和第四种之间,select 属于第三种。

Kafka 的客户端底层使用了 Java 的 selector,而 selector 在 Linux 的实现是 epoll,在 Windows 上实现机制为 select。因此 Kafka 部署在 Linux 会有更高效的 I/O 性能。

② 网络传输效率

数据在磁盘和网络之间进行传输时候,在 Linux 上可以享受到零拷贝机制带来的快捷和便利高效,而 Windows 则不行。

③ 社区支持度

Linux 平台出现的问题,社区会优先解决。

Windows 平台出问题,一般不会解决。

二、磁盘

先说结论:

    选用普通机械硬盘即可。

    如果资金充裕可以将硬盘组 Raid;

    如果资金紧张,不使用 Raid 方案也可以保证读写性能。


问题一:选用普通机械硬盘还是固态硬盘?

使用普通机械硬盘即可,Kafka 存储方式为顺序读写,机械硬盘的最大劣势在于随机读写慢。所以使用机械硬盘并不会造成性能低下。

问题二:是否需要将磁盘组 raid?

raid 的优势在于:提供冗余磁盘存储空间,提供负载均衡。

但是 Kafka 自身已经有冗余机制,而且通过分区的设计,实现了负载均衡的功能。所以如果有经济能力可以放在组 raid 的存储空间上,如果考虑性价比,可以直接不做 raid。

三、磁盘容量

先说结论,需要考虑的因素有以下几个:

    每天的消息数

    每条消息大小

    副本数

    消息保留时间

    是否启用压缩


问题讨论:Kafka 需要多大的存储空间?

设计场景:日志数据每天向 kafka 发送 1 亿条数据,每条数据有两个副本防止数据丢失,数据保存两周,每条消息平均大小为 1KB。

每天 1 亿条 1KB 消息,保存两周两份,则每天总大小为:1 亿*1KB*2/1000/1000=200GB

kafka 除了消息数据还有其他类型的数据,故增加 10%的冗余空间,则需要 220GB

两周时间则为 220GB*14≈3TB

如果启用压缩,压缩比约在 0.75 左右,则总存储空间规划为 3TB*0.75=2.25TB

四、带宽

先说结论:

    如果网络为万兆带宽,基本不会出现网络瓶颈,如果数据量特别大,按照下文中的设计场景进行计算。

    如果网络为百兆或者千兆带宽,在处理较大数据量场景下会出现网络瓶颈,可按照下面的传统经验公式进行计算处理,也可按照下述场景按照自己生产实际情况进行设计。

    经验公式:服务器台数 = 2 × (生产者峰值生产速率 × 副本数 ÷ 100) +  


带宽情况最容易成为 kafka 的瓶颈。

设计场景:如果机房为千兆带宽,我们需要在一小时内处理 1TB 的数据,需要多少台 kafka 服务器?

由于带宽为千兆网,1000Mbps=1Gbps,则每秒钟每个服务器能收到的数据量为 1Gb=1000Mb

假设 Kafka 占用整个服务器网络的 70%(其他 30%为别的服务预留),则 Kafka 可以使用到 700Mb 的带宽,但是如果从常规角度考虑,我们不能总让 Kafka 顶满带宽峰值,所以需要预留出 2/3 甚至 3/4 的资源,也就是说,Kafka 单台服务器使用带宽实际应为 700Mb/3=240Mb

1 小时需要处理 1TB 数据,1TB=1024*1024*8Mb=8000000Mb,则一秒钟处理数据量为:8000000Mb/3600s=2330Mb 数据。

需要的服务器台数为:2330Mb/240Mb≈10 台。

考虑到消息的副本数如果为 2,则需要 20 台服务器,副本如果为 3,则需要 30 台服务器。

五、内存

先说结论:建议安装 Kafka 的服务器节点的内存至少大于等于 16G。


Kafka 的内存由堆内存+页缓存组成。

① 堆内存配置

建议每个节点 10G-15G

需要在kafka-server-start.sh进行修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
② Kafka 的 GC 情况查询

通过 jps 命令查看 Kafka 的进程号

[atguigu@hadoop102 kafka]$ jps2321 Kafka5255 Jps1931 QuorumPeerMain

根据 Kafka 进程号,查看 Kafka 的 GC 情况

[atguigu@hadoop102 kafka]$ jstat -gc 2321 1s 10 S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC      MU     CCSC   CCSU   YGC     YGCT    FGC     FGCT      GCT 0.0   7168.0  0.0   7168.0 103424.0 60416.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 60416.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 60416.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 60416.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 60416.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 61440.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 61440.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 61440.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 61440.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531 0.0   7168.0  0.0   7168.0 103424.0 61440.0  1986560.0   148433.5  52092.0 46656.1 6780.0 6202.2     13    0.531   0      0.000    0.531

名词解释:

S0C:第一个幸存区的大小

S1C:第二个幸存区的大小

S0U:第一个幸存区的使用大小

S1U:第二个幸存区的使用大小

EC:伊甸园区的大小

EU:伊甸园区的使用大小

OC:老年代大小

OU:老年代使用大小

MC:方法区大小

MU:方法区使用大小

CCSC:压缩类空间大小

CCSU:压缩类空间使用大小

YGC:年轻代垃圾回收次数

YGCT:年轻代垃圾回收消耗时间

FGC:老年代垃圾回收次数

FGCT:老年代垃圾回收消耗时间

GCT:垃圾回收消耗总时间


③ Kafka 的堆内存占用查询

根据 Kafka 进程号,查看 Kafka 的堆内存

[atguigu@hadoop102 kafka]$ jmap -heap 2321Attaching to process ID 2321, please wait...Debugger attached successfully.Server compiler detected.JVM version is 25.212-b10

using thread-local object allocation.Garbage-First (G1) GC with 8 thread(s)

Heap Configuration:   MinHeapFreeRatio         = 40   MaxHeapFreeRatio         = 70   MaxHeapSize              = 2147483648 (2048.0MB)   NewSize                  = 1363144 (1.2999954223632812MB)   MaxNewSize               = 1287651328 (1228.0MB)   OldSize                  = 5452592 (5.1999969482421875MB)   NewRatio                 = 2   SurvivorRatio            = 8   metaspaceSize            = 21807104 (20.796875MB)   CompressedClassSpaceSize = 1073741824 (1024.0MB)   MaxmetaspaceSize         = 17592186044415 MB   G1HeapRegionSize         = 1048576 (1.0MB)
Heap Usage:G1 Heap:   regions  = 2048   capacity = 2147483648 (2048.0MB)   used     = 246367744 (234.95458984375MB)   free     = 1901115904 (1813.04541015625MB)   11.472392082214355% usedG1 Young Generation:Eden Space:   regions  = 83   capacity = 105906176 (101.0MB)   used     = 87031808 (83.0MB)   free     = 18874368 (18.0MB)   82.17821782178218% usedSurvivor Space:   regions  = 7   capacity = 7340032 (7.0MB)   used     = 7340032 (7.0MB)   free     = 0 (0.0MB)   100.0% usedG1 Old Generation:   regions  = 147   capacity = 2034237440 (1940.0MB)   used     = 151995904 (144.95458984375MB)   free     = 1882241536 (1795.04541015625MB)   7.471886074420103% used
13364 interned Strings occupying 1449608 bytes
④ Kafka 页缓存

页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(默认值为 1G)中 25%的数据在内存中就好。


综合上述,Kafka 在大数据场景下能够流畅稳定运行至少需要 11G,建议安装 Kafka 的服务器节点的内存至少大于等于 16G。

六、CPU 选择

先说结论:建议 Kafka 服务器 CPU 核数在 32 以上。


观察所有的 Kafka 与线程相关的配置,一共有以下几个

参数名备注默认值
num.network.threads服务器用于接收来自网络的请求并向网络发送响应的线程数3
num.io.threads服务器用于处理请求的线程数,其可能包括磁盘 I/O8
num.replica.fetchers副本拉取线程数,调大该值可以增加副本节点拉取的并行度1
num.recovery.threads.per.data.dir每个数据目录在启动时用于日志恢复和在关闭时刷新的的线程数1
log.cleaner.threads用于日志清理的后台线程数1
background.threads用于各种后台处理任务的线程数10

其中,第 4 个参数在启动和关闭时候才会使用,日志清理也是在一定时间间隔才会有。所以,常驻线程应该有至少 22 个以上。

在生产环境中,建议 CPU 核数最少为 16 核,建议 32 核以上,方可保证大数据环境中的 Kafka 集群正常处理与运行。

七、Kafka集群所有重要参数配置详解与优化设置

成文时间:2022年2月22日。参数如有变化,请以官网参数和默认值为准!

Broker端配置详解

官网提示:以下三个参数必配:

参数名备注默认值
broker.id此参数必配,每台服务器都不相同且唯一。-1
log.dirs此参数必配,而且建议多目录。而且最好多个目录分属于不同的物理磁盘。
好处1:提高读写性能,多块物理磁盘能使读写具有更高的IO量;
好处2:实现故障转移,如果磁盘坏了,可以自动将数据转移到正常磁盘上,且Broker不会宕机。
如果这个参数没配,则使用log.dir中的参数配置,默认为/tmp/kafka-logs
null
log.dir如果log.dirs没配,则使用log.dirnull
zookeeper.connect此参数必配。
格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path。
例如我们经常配置的:hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka,
切记不要写成:hadoop102:2181/kafka,hadoop103:2181/kafka,hadoop104:2181/kafka
null

监听器相关参数
参数名备注默认值
listeners监听器,规定协议与访问主机名和端口,默认值PLAINTEXT://:9092,这里面需要配置的是一个三元组,格式为<协议名称://主机名:端口号>,将hostName指定为0.0.0.0绑定到所有接口。将hostname留出为空绑定到默认接口。如果配置了自定义的协议名称,还需要配置listener.security.protocol.map来告诉协议底层使用了何种安全协议。PLAINTEXT://:9092
advertised.listeners对外发布的监听器地址,为客户端使用,不同于listeners,此参数允许端口重复,作负载均衡用。如果需要外网访问则会用到此参数,一般情况下,不需要设置!null
control.plane.listener.name配置监听器的名字,一般用不到,默认为nullnull
controller.listener.namesKRaft模式下有用,zk模式下用不到。null

主题相关参数
参数名备注默认值
auto.create.topics.enable测试环境中,这个参数通常开启,设置为true,但是生产环境中一般关闭,设置为false。我们会发现一个现象,如果我们不创建主题,直接向名为“test”的主题发送消息,则kafka中会使用默认主题配置自动创建名为test的topic,但是这种情况一般在生产环境中不允许。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。true
delete.topic.enable是否允许删除主题,默认为true,一般不修改。一般会通过权限管理来限制部分用户删除主题。个别权限控制严格的公司可能会修改为false,但这样就无法删除topic,生产中建议不修改!true

线程相关参数
参数名备注默认值
background.threads默认为10,后台处理任务的线程数。如果不出问题,默认不修改。10
num.io.threads服务器用于处理请求的线程数,其可能包括磁盘I/O8
num.network.threads服务器用于接收来自网络的请求并向网络发送响应的线程数3
num.recovery.threads.per.data.dir在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数1
num.replica.alter.log.dirs.threads可以在日志目录之间移动副本的线程数,这可能包括磁盘I/O,不算常驻线程,无需设置。null
num.replica.fetchers用于复制来自源代理的消息线程的数量。增加该值可以增加follower broker中的I/O平行度的程度。1

压缩相关参数

生产中,一般Kafka会配置压缩以减少磁盘占用。

参数名备注默认值
compression.type配置消息压缩格式,有以下几种:'gzip', 'snappy', 'lz4', 'zstd',生产环境中需要按照实际是否需要压缩进行配置。
还可以配置'uncompressed'意为不压缩,还可以配置'producer'意为和producer端的压缩格式保持一致。
producer

附:Facebook Zstandard官网提供的压缩算法对比结果:

Compressor nameRatioCompressionDecompress
zstd 1.3.4-12.877470 MB/s1380 MB/s
zlib 1.2.11-12.743110 MB/s400 MB/s
brotli 1.0.2-02.701410 MB/s430 MB/s
quicklz 1.5.0-12.238550 MB/s710 MB/s
lzo1x 2.09-12.108650 MB/s830 MB/s
lz4 1.8.12.101750 MB/s3700 MB/s
snappy 1.1.42.091530 MB/s1800 MB/s
lzf 3.6-12.077400 MB/s860 MB/s

ZooKeeper相关参数

ZK相关的参数一般不做修改。默认即可。

参数名备注默认值
controller.quorum.election.backoff.max.ms开始新的选举前的最长时间(毫秒),防止选举陷入僵局。默认1000ms,不修改。1000
controller.quorum.election.timeout.ms没有leader后触发选举的等待市场,默认1000ms,不修改1000
controller.quorum.fetch.timeout.ms在成为候选者和触发投票选举之前从当前leader处拉取数据失败等待的最大时长,默认2000ms。不修改。2000
controller.quorum.voters参与投票的服务器列表,逗号分割。需要填入非空的集合“”
zookeeper.connection.timeout.ms客户端等待与Zookeeper建立连接的最大时间。
如果没有设置,则使用zookeeper.session.timeout.ms中的值
null
zookeeper.max.in.flight.requests客户端在阻塞前发送给Zookeeper的未确认请求的最大数量。10
zookeeper.session.timeout.msZK的会话超时时间18000

重平衡与选举相关参数

重平衡相关参数需要根据实际需求进行调整,原理类似于HDFS中的重平衡。

参数名备注默认值
unclean.leader.election.enable是否将未设置在ISR中的副本作为最后的手段来选举为leader,即使这样做可能会导致数据丢失。
这个参数默认为false。意为关闭“不干净的”leader选举。
Kafka有多个副本,每个分区都有多个副本,但只有一个leader副本对外提供服务。不是所有的副本都有资格竞选leader,只有保存数据量多的副本才有资格竞选,即所谓的ISR中的才能竞选。未在ISR中的副本被称为unclean的。
如果此参数设置为true,则未在ISR中的副本也可参与竞选,这样就会有丢失数据的风险。
生产环境中建议设置此值为false!
false
auto.leader.rebalance.enable请注意,此参数在生产环境中极易被忽视。
设置为true则表示允许Kafka定期进行对一些topic的leader的重新选举。即便是之前的leader没有任何问题,也有可能在满足选举条件之后换leader。
但是在生产中换leader的成本极高,且没有性能收益,所以在生产中建议设置为false!
true
leader.imbalance.check.interval.seconds检查各个分区是否平衡的频率,默认300s,不修改300
leader.imbalance.per.broker.percentage触发重平衡的阈值百分比,默认为10。
举例,如果一个broker上有10个分区,有两个分区的leader不是preferred leader,比例超过10%,则会触发重平衡。
10

日志刷写相关参数

在Linux系统中,当我们把数据写入文件系统之后,其实数据在操作系统的pagecache里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用fsync这个系统调用来强制刷盘,另一方面,操作系统有后台线程,定时刷盘。频繁调用fsync会影响性能,需要在性能和可靠性之间进行权衡。实际上,官方不建议通过上述的三个参数来强制写盘,认为数据的可靠性通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。

Kafka的持久性并非要求同步数据到磁盘,因为问题节点都是从副本中恢复数据。这样刷盘依赖操作系统及Kafka的后台刷盘机制。这样的好处是:无需调优、高吞吐量、低延时和可全量恢复。

操作系统一般默认30s刷盘一次。

参数名备注默认值
log.flush.interval.messages刷写数据到磁盘的消息条数间隔,默认值为Long的最大值9223372036854775807,不建议修改!9223372036854775807
log.flush.interval.ms消息在刷新到磁盘之前保存在内存中的最大时间(以ms为单位)。如果没有设置,则使用log.flush.scheduler.interval.ms中的值,默认nullnull
log.flush.scheduler.interval.ms日志刷新器检查是否有日志需要刷新到磁盘的频率(以ms为单位),默认值Long的最大值9223372036854775807,不建议修改!9223372036854775807
log.flush.offset.checkpoint.interval.ms更新作为日志恢复点的上次刷新的持久记录的频率,默认60000ms(1min)。60000
log.flush.start.offset.checkpoint.interval.ms更新日志开始偏移量的持久记录的频率,默认60000ms(1min)。60000

日志保留相关参数

日志保留相关参数需要根据具体的生产实际及磁盘容量与数据量进行调整。

参数名备注默认值
log.retention.bytes删除日志前的日志大小,此参数一般不用,默认值为-1。表示在当前Broker上保存的数据容量不受限制。
如果你所在的公司是一个对外提供云服务的公司,需要做租户管理,那么这个参数就有设置的必要性,可以为单租户设置Kafka保存消息的容量上限。
-1
log.retention.hours日志保留的时长,默认168小时,7天168
log.retention.minutes日志保留的分长,默认nullnull
log.retention.ms日志保留的毫秒值。默认null,如果设置为-1,那么日志无限期保留。
以上三个参数的优先级为:ms > minutes > hours,如果没有配ms,找minutes,如果没有minutes找hours
null

日志滚动切片相关参数

日志滚动与切片参数建议根据生产实际进行调整。

参数名备注默认值
log.roll.hours优先级小于log.roll.ms,日志滚动的最大时长,默认值168hours,也就是7days168
log.roll.ms日志滚动的最大时长,默认值null,这个值未设置会去找log.roll.hoursnull
log.roll.jitter.hours给日志段的切分加一个扰动值,避免大量日志段在同一时间进行切分操作。
如果发现kafka有周期性的磁盘I/O打满情况,建议设置此值。
0
log.roll.jitter.ms同上一个参数,优先级高于log.roll.jitter.hoursnull
log.segment.bytes单个log文件的最大大小,默认1073741824,也就是1G1073741824
log.segment.delete.delay.ms从文件系统中删除文件之前等待的时间60000 (1 minute)

元数据相关参数

元数据相关参数一般不做调整。

参数名备注默认值
metadata.log.dir这个配置决定了我们在KRaft模式下将集群的元数据日志放在哪里。
如果没有设置,元数据日志将放在log.dirs的第一个日志目录中。如果不是KRaft模式,元数据在ZK中
null
metadata.log.max.record.bytes.between.snapshots这是在生成新快照之前,日志中从最新快照到高水位之间的最大字节数。默认20MB20971520
metadata.log.segment.bytes单个元数据日志文件的最大大小。默认1GB1073741824
metadata.log.segment.ms新元数据日志文件滚出之前的最大时间(以毫秒为单位)。默认7天。604800000
metadata.max.retention.bytes删除旧快照和日志文件前元数据日志和快照的最大组合大小。因为在删除任何日志之前必须至少存在一个快照,所以这是一个软限制。-1
metadata.max.retention.ms在删除元数据日志文件或快照之前保存该文件或快照的毫秒数。因为在删除任何日志之前必须至少存在一个快照,所以这是一个软限制。默认7天。604800000

副本相关参数

副本相关参数一般不做调整。

参数名备注默认值
min.insync.replicas当生产者将ack设置为“all”(或“-1”)时,min.insync.replicas指定必须确认写入成功的最小副本数量。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasApend)
此参数和ack设置配合使用可以允许您强制执行更大的耐用性保证。典型的方案是创建一个主题,复制因子为3,设置min.insync.replicas为2,并使用ack级别为all,如果大多数副本没有收到写入,这将确保生产者提出异常。
1
replica.fetch.min.bytes每次读取响应所需的最小字节数。如果没有足够的字节,则等待下面的replica.fetch.wait.max.ms参数1
replica.fetch.wait.max.msfollower副本发出的每个获取器请求的最大等待时间。该值在任何时候都应该小于replica.lag.time.max.ms,以防止低吞吐量主题的ISR频繁收缩500
replica.high.watermark.checkpoint.interval.ms将高水位保存到磁盘的频率5000
replica.lag.time.max.ms如果follower没有发送任何fetch请求,或者没有消耗leader日志结束偏移量,leader将从ISR中删除follower30000
replica.socket.receive.buffer.bytes用于网络请求的套接字接收缓冲区,64K65536
replica.socket.timeout.ms网络请求的套接字超时。它的值至少应该是replica.fetch.wait.max.ms30000

offset相关参数

offset内部主题相关参数,一般保持默认即可。

参数名备注默认值
offset.metadata.max.bytes配置offset请求的最大请求消息大小,默认4K4096
offsets.commit.required.acks配置提交offset请求的ack值,默认-1-1
offsets.commit.timeout.ms配置提交offset的最长等待时间,5s5000
offsets.load.buffer.size用于读取offset信息到内存cache时候,读取缓冲区的大小,默认5M。5242880
offsets.retention.check.interval.ms定期检查offset过期数据的周期,默认600秒。600000
offsets.retention.minutes针对一个offset的消费记录的最长保留时间,默认为10080分钟,即7天。10080
offsets.topic.compression.codecoffset主题的压缩解码器,可以用来实现原子提交0
offsets.topic.num.partitionsoffset主题的默认分区数,默认50个,这与我们看到的默认offset主题中的分区数一致。50
offsets.topic.replication.factoroffset主题的副本因子,默认3个。低于这个数值,offset主题将创建失败。3
offsets.topic.segment.bytesoffset字节应该保持相对较小,以促进更快的日志压缩和缓存负载,默认100M104857600

消息相关参数
参数名备注默认值
message.max.bytesKafka允许的最大消息批次大小,如果启用了压缩,那么计算压缩之后的大小。
在老版本的Kafka中,如果消息未压缩,则不会进行批处理,则此值设置的为单条消息的大小上限。
最新官方文档中默认数值为1048588,刚刚比1M多出来12Bytes。
需要注意,如果在实际生产中,你的单条消息超过了1M,则必须增大此值。
一般地,在生产环境中,尤其是有接收较大单条数据的场景中,为了防止该值过小造成接收数据失败,均需将此值调大。



Topic级别配置详解

topic级别的参数,一般都在broker中对应有默认配置,但是也可以对单独的topic进行设置,可以在topic创建之初使用--config来进行指定,也可以在创建完成之后再进行修改。

以下是比较重要的topic级别的参数配置。

日志清理压缩相关参数
参数名备注默认值未指定时参考配置
cleanup.policy指定过期日志使用的保留策略,默认为delete将删除过期日志段,设置为compact将会进行压缩。可选值为delete或者compactdeletelog.cleanup.policy
compression.type参考broker设置中的压缩,可选值为[uncompressed, zstd, lz4, snappy, gzip, producer]producercompression.type
delete.retention.ms配此配置专门针对tombstone类型的消息进行设置,默认为86400000,也就是1天,这个tombstone在当次compact完成后并不会被清理,在下次compact时候,最后的修改时间加上此配置时间值大于当前时间才会被删除。86400000log.cleaner.delete.retention.ms
file.delete.delay.ms从文件系统中删除文件前的等待时间60000log.segment.delete.delay.ms
min.cleanable.dirty.ratio默认值为0.5,此配置项控制日志压缩的比率,比率越高,则压缩的日志越少。0.5log.cleaner.min.cleanable.ratio
min.compaction.lag.ms此参数配合上面的min.cleanable.dirty.ratio使用。配置该值后,如果上述压缩比率满足,且日志在该值持续时间内有未压缩记录,则日志符合压缩条件。0log.cleaner.min.compaction.lag.ms
max.compaction.lag.ms消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。配置该值后,如果日志在该值持续时间内有未压缩记录,则日志符合压缩条件。9223372036854775807log.cleaner.max.compaction.lag.ms

日志刷写相关参数
参数名备注默认值未指定时参考配置
flush.messages强制刷写的消息条数,建议不要设置此值!!
相关说明参考broker端配置。
9223372036854775807log.flush.interval.messages
flush.ms强制刷写的时间间隔,建议不要设置此值!!  
相关说明参考broker端配置。
9223372036854775807log.flush.interval.ms

索引相关参数
参数名备注默认值未指定时参考配置
index.interval.bytes索引的间隔,我们知道Kafka为稀疏索引,默认值为4096,如果设置较小的值,则索引间隔变短,索引的容量会变大,但能更快命中。建议不修改,默认即可。4096log.index.interval.bytes

消息、副本与选举相关参数
参数名备注默认值未指定时参考配置
max.message.bytes参考broker端的message.max.bytes配置项,设置单条消息的大小上限。建议在生产中调大!1048588message.max.bytes
message.timestamp.type定义消息中的时间戳取创建时间还是日志追加时间,可选值为[CreateTime, LogAppendTime],默认为CreateTime,无需修改。CreateTimelog.message.timestamp.type
min.insync.replicas具体解释参考broker端配置项min.insync.replicas1min.insync.replicas
unclean.leader.election.enable具体解释参考broker端配置项unclean.leader.election.enablefalseunclean.leader.election.enable

日志保留与滚动相关参数
参数名备注默认值未指定时参考配置
retention.bytes具体解释参考broker端配置项log.retention.bytes-1log.retention.bytes
retention.ms具体解释参考broker端配置项log.retention.ms604800000log.retention.ms
segment.bytes具体解释参考broker端配置项log.segment.bytes1073741824log.segment.bytes
segment.index.bytes具体解释参考broker端配置项log.index.size.max.bytes10485760log.index.size.max.bytes
segment.jitter.ms具体解释参考broker端配置项log.roll.jitter.ms0log.roll.jitter.ms
segment.ms具体解释参考broker端配置项log.roll.ms604800000log.roll.ms



Producer端配置详解

Producer负责向服务器发送数据,在实际生产中,更多的是使用API作为Producer端进行数据发送。

以下是Producer端中比较重要的配置参数。

序列化、分区器、拦截器相关参数
参数名备注默认值
key.serializer需填写key的序列化器,实现org.apache.kafka.common.serialization.Serializer接口null
value.serializer需填写value的序列化器,实现org.apache.kafka.common.serialization.Serializer接口null
partitioner.class分区器的全类名。可以通过实现org.apache.kafka.clients.producer.Partitioner接口自定义分区器。org.apache.kafka.clients.producer.internals.DefaultPartitioner
interceptor.classes拦截器全类名,可以通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口自定义拦截器null

集群地址与压缩相关参数
参数名备注默认值
bootstrap.servers须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址null
compression.type压缩格式,默认为none。因为broker端压缩配置默认为producer,意为与producer保持一致。可选值为[none, zstd, lz4, snappy, gzip]。
设置压缩可以提高消息发送效率。
none

批处理相关参数
参数名备注默认值
buffer.memory生产者用于将消息发送至服务器的总缓冲区大小,如果消息发送的速度超过了传送到服务器的速度,生产者将会阻塞max.block.ms时间,之后抛出异常。
默认值为33554432,也就是32M。
生产实际中 ,如果单条消息过大,需要调整该参数。
33554432
batch.size此参数十分重要!
默认16K。
如果消息发往同一个分区,producer端将会把消息以批处理的形式发送,一个批次的大小由该参数控制!
如果消息大小大于该值,则消息不会以批处理形式发送。
该值不宜设置过大,太大的批会浪费内存空间。
如果一批数据未达到该值,则该批数据会等待linger.ms时间后发送。
为了提升处理效率,Kafka将从缓冲区中预先分配batch.size大小的内存空间。
生产中一般会根据实际消息大小,将buffer.memory和batch.size配合调整。
16384
linger.ms一批消息发送之前的等待时长,默认为0。合理设置此时间将有助于有效填满批次,提高效率。但是如果每次一批数据都能攒满发送,设置此值将会造成整体消息延迟。生产中需要根据具体需求设置。0
max.request.size单个请求的最大值。如果单个消息过大,需要调整该值。1048576

TCP缓冲相关参数
参数名备注默认值
receive.buffer.bytesTCP接收缓冲大小,默认不修改32768
send.buffer.bytesTCP发送缓冲大小,默认不修改131072

消息有序性相关参数
参数名备注默认值
retries此值大于0,客户端将会重新发送失败的消息。如果max.in.flight.requests.per.connection的值不为1,则重试有可能会改变消息的有序性。一般这个值默认即可,不需要调整,重试由delivery.timeout.ms参数控制。2147483647
acks重要参数!ack级别决定了一个请求完成之前,生产者要求leader返回的确认数。
可选值有[all, -1, 0, 1]。
acks为0表示生产者不会等待服务器确认,这样不能保证服务器已经收到消息。
acks为1表示leader已经确认,但是未等待follower确认。
ack为all或者-1需要等待ISR中的所有副本确认,保证数据不丢失。
all
enable.idempotence默认为true。生产者将确保每个消息精准写入一次。如果设置为false,可能因为broker端失败而重复写入消息。如果要启用幂等,需要满足:max.in.flight.requests.per.connection小于等于5,retries大于0,acks为all。true
max.in.flight.requests.per.connection客户端在单个连接上发送的未确认请求的最大数量。如果此值大于1,且enable.idempotence为false,则消息有可能因重试而产生乱序。5

事务相关参数
参数名备注默认值
transactional.id如果需要启用事务,需要分配该值。默认为nullnull



Consumer端配置详解

Consumer对数据进行消费,一般通过流处理框架对Kafka中的数据进行消费处理,因此,Consumer端是否消费正常对于数据处理显得尤为重要。

以下是Consumer端的重要参数。

必配参数
参数名备注默认值
bootstrap.servers须填写Kafka集群的地址与端口,如果包含多个broker,可以只填写其中一个,但是为预防填写单个broker产生宕机,建议填写全部可连接broker地址null
group.id规定消费者属于哪个消费者组。如果需要使用Topic,Partition和Group来限定消费offset,则此值必须设置!null
auto.offset.resetKafka中没有初始偏移量的时候,或者当前偏移量在服务器上没有时候的消费策略。
可选值:[latest, earliest, none]。
1.earliest会重置到最早偏移量。
2.latest重置为最近偏移量。
3.设置为none时候,如果消费者所在的组没有找到之前的偏移量,则抛出异常。
latest

反序列化、拦截器相关参数
参数名备注默认值
key.deserializerkey的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer接口null
value.deserializervalue的反序列化器,需要实现org.apache.kafka.common.serialization.Deserializer接口null
interceptor.classes拦截器的类。可通过实现org.apache.kafka.clients.consumer.ConsumerInterceptor进行自定义。null

自动提交offset参数
参数名备注默认值
enable.auto.commit设置是否周期性自动提交offset。如果在代码中手动维护偏移量,则需要将此值设置为false。true
auto.commit.interval.ms如果设置了自动提交enable.auto.commit为true,那么该参数生效,提交间隔默认为5秒。5000

拉取数据相关参数
参数名备注默认值
fetch.min.bytes服务器在获取请求时返回的最小数据量,默认为1,意味着只要有一个字节的数据可用,就会立即相应请求,调大该值将会让服务器等待接受更多数据,但是会略微增加延迟。1
fetch.max.bytes服务器在获取请求时返回的最大数据量。默认50M。
如果Kafka发送的每条消息都比这个数值要大,那只会返回非空分区中的第一条数据。
Kafka中所能接收的最大消息大小受两个参数控制:
1.broker端参数message.max.bytes
2.topic级别参数max.message.bytes
52428800
fetch.max.wait.ms如果没有足够的数据满足fetch.min.bytes,服务器将等待的最大时长。500
max.partition.fetch.bytes默认1M。
服务器将返回每个分区的最大数据量。
1048576

分区分配策略参数
参数名备注默认值
partition.assignment.strategy消费者的分区分配策略。有以下几种:org.apache.kafka.clients.consumer.RangeAssignor,
org.apache.kafka.clients.consumer.RoundRobinAssignor,
org.apache.kafka.clients.consumer.StickyAssignor,
org.apache.kafka.clients.consumer.CooperativeStickyAssignor。
第一种按照消费者总数和分区总数进行均分,
第二种为轮循分配,
第三种为黏性分区,
第四种与第三种类似,但是多了再平衡。
默认为第一种。
也可以自定义,只需要实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口
class org.apache.kafka.clients.consumer.RangeAssignor,
class org.apache.kafka.clients.consumer.CooperativeStickyAssignor
receive.buffer.bytesTCP接收缓冲大小,默认不修改65536

TCP缓冲及其他参数
参数名备注默认值
receive.buffer.bytesTCP接收缓冲大小,默认不修改65536
send.buffer.bytesTCP发送缓冲大小,默认不修改131072
exclude.internal.topics是否排除内部主题。
我们都知道Kafka有一个内部主题为__consumer_offsets,如果使用正则方式订阅主题且匹配到了内部主题,则该参数控制是否将内部主题暴露给消费者。如果为false,则只能通过订阅的方式来消费内部topic。
true
allow.auto.create.topics允许自动创建topic。需要broker端auto.create.topics.enable参数为true才能使用。
此参数默认即可。
true
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748518.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号