目录
broker配置
Topic配置
Producer配置
Consumer配置
kafka服务器优化
broker配置
一个独立的 Kafka 服务器被称为broker。它接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
| 配置项 | 说明 |
| broker.id | 每个broker 都需要有个标识符,使用 broker.id 来表示。它的默认值是0,也可以被设成其任意整数。这个值在整个kafka集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,可以在服务器节点间交换使用这些ID 。建议把它们设置成与机器名具相关性的整数,这样在进行维护时,将 ID 号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如 hostl.example.com、host2.example.com ),那么用这些数来设置 broker.id 就再好不过了。 |
| port | kafka默认会监听 9092 端口。修改 port 配置参数可以把它设成其他任意可用的端口。注意,如果使用1024 以下的端口,需要使用 root 权限启动Kafka ,不过不建议这么做。 |
| zookeeper.connect |
用于保存 b
roker
元数据的
Zoo
ke
地址是通过
zookeepe
.connect
来指定的。
localhost:2181
表示这个
Zooke
是运行在本地的 21
81
端口上
。该
配置参数是用
冒号分
隔的
hostnal'le:por
t/path
列表,每
部分的含义如下:
hostname是Zookeeper服务器的机器名或 IP 地址;port是Zookeeper的客户端连接端口;/parth 是可选的 Zookeeper路径,作为kafka 集群的 chroot 环境。如果不指定,默认使根路径。如果指定的 hroot 路径不存在, broker 会在启动的时候创建它。 |
| lo g . dirs | Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 l og.d irs 指定的。它是一 组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 b roke 会根据“最少使 用”原则,把同一 个分区的日志片段保存到同一个路径下。注意, broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。 |
| nu m.recovery . threads.per. data.dir |
对于
如下三
种情况,
Kafka
会使用可配置的线程池来处理日志片段:
服务器正常启动,用于打开每个分区的日志片段; 服务器崩愤后重启,用于检查和截短每个分区的日志片段;服务器正常关闭,用于关闭日志片段; 默认情况下 ,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会 用到 ,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的 服务器来说,一 旦发生崩愤,在进行恢复时 使用并行操作可能会省下数 小时的时 间。设置 此参数时需要注意,所配置的数字对应的是 log. dir 指定的单个日志目录。也就是说,如果num.recovery. threads.per.data.dir 被设为8,并且 log.dir 指定了3个路径,那么总共需 24 个线程。 |
| a uto . c r ea t e . topics . enable | 默认 情况下, Kafka 会在如下几种情形下自动创 建主题: 当一 个生产者开始往主题写入消息时; 当一 个消费者开始从主题读取消息时; 当 任意一个客 户端向主题发送元数据请求时; 根据 kafka 协 议,如果一个主题不先被创建, 根本无法知道它是否已经存在。如果显式地创 建主题, 不管是手动创 建还是通过其 配置 系统来 创建,都可以把 auto.create.topi..cs enable 设为 false。 |
如何选定Broker数量:一个Kafka 集群需要多少个 broker 决于以下几个因素:
首先,需要多少磁盘空间来保留数据,以及单个 broker 有多少空间可用。如果整个集群需要保留 10TB 的数据, 每个 broker 可以存储 2TB ,那么至少需要5个 broker 。如果启用了数据复制,那么至少还需要一倍的空间,不过这要取决于配置的复制系数是多少。 也就是说,如果启用了数据复制,那么这个集群至少需要 10 broker。
其次,要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有关,特别是当有多个消费者存在或者在数据保留期间流量发生波动(比如高峰时段的流量爆发)时。如果单个 broker 的网络接口在高峰时段可以达到80%的使用量,并且有两个消费者,那么消费者就无法保持峰值,除非有两个 broker 。如果集群启用了复制功能,则要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题,也可以通过扩展多个 broker 来解决。
Topic配置
| 配置项 | 说明 |
| num.par itions | num.par itions 参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值 。该参数 的默认值是1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果耍让一 个主 题的分区个数少于 num.par itions 指定的值,需要手动创建该主题。 |
| log.retention.ms | Kafka 通常根据时间来决定数据可以被保留多久。默认使用 log. r etentlon.hour 参数来配 置时间 ,默认值为 168 小时,也就是 一周。除 此以外,还有其他两个参数 log.retention.minutes和log.retention.ms。这3个参数的作用是一样的,都是决定消息多久以后会被删 除,不过还是推荐使用 log.retention.ms。如果指定了不止一 个参数, Kafka 会优先使用 具有最小值的那个参数。 |
| log.r etention . bytes | 另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含8个分区的主题,并且 log.retention.bytes 被设为 1GB ,那么这个主题最多可以保留 8GB 的数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。如果同时指定了 log.retention.bytes 和 log.retention.bytes (或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。 |
| log.segment.bytes | 以上的设置都作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上。当日志片段大小达到 log.segment.bytes 定的上限(默认是 1GB )时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘入的整体效率。 如果主题的消息量不大,那么 如何调整这个参数的大小就变得尤为重要。例如,如果一个 主题 每天只接收 100MB 的消息,而 log.segment.bytes 使用默认设置,那么需要 10 天时 间才能填满 个日志片段。因为在日志片段被关闭之前消息是不 会过期的,所以如果 log.r etention.ms 被设为 604 800000 ( 一 周),那么日志片段最多需要 17 天才会过期。 这是因为关闭日志片段需要 10 天的时间,而根据配置的过期时间,还需要再保留7 天时 间(要等到日志片段里的最后一 个消息过期才能被删除) |
| log.segment.ms | 另一个可以控制日志片段关闭时间的参数是 log.segment.ms,它指定了多长时间之后日志片段会被关闭。就像 log.retention.bytes 和 log.retention.ms 这两个参数一样,这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下log.segment.ms有设定值,所以只根据大小来关闭日志片段。在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段永远不能达到大小的上限,就会发生这种情况,因为broker在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。 |
| message . max . bytes | broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认值是 1000000 ,也就是 1MB 。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误信息。跟其他与字节相关的配置参数一样 ,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于message.max.bytes 指定的值,消息的实际大小可以远大于这个值。这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。消费者客户端设置的fetch.message.max.bytes必须与服务器端设置的消息大小进行协调。如果这个值比message.max.bytes小,那么消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况。在为集群里的 broker 配置replica.fetch.max.bytes 参数遵循同样的原则。 |
如何选定分区数量:为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。
主题需要达到多大的吞吐量?例如,是希望每秒钟写入1OOKB还是1GB。从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒50MB。可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。每个 broker 包含的分区个数、可用的磁盘空间和网络带宽。如果消息是按照不同的键来写入分区的,那么为已有的主题新增分区就会很困难。单个 broker 对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间也越长。
综合考虑以上几个因素,分区不是越多越好。如果你估算出主题的吐量和悄费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。也就是说,如果每秒钟要从主题上写入和读取 lGB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要20个分区。这样就可以让20个消费者同时读取这些分区,从而达到每秒钟 lGB 的吞吐量。如果不知道这些信息,那么根据经验,把分区的大小限制在 25G 以内可以得到比较理想的效果。
Producer配置
| 配置项 | 说明 |
| bootstrap.servers | 该属性指定 broker 的地址清单,地址的格式为 host:port 。清单里不需要 包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。建议至少要 提供两个 broker 信息, 一旦其中一个宕机,生产者仍然能够连接到集群上。 |
| key.serializer | broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把 Java 对象作为键和值发送给 broker 。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java 对象转换成字节数组。key. serializer必须被设置为一个实现了org.apache.kafka.common.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。 Kafka 客户端默认提供了 ByteArraySerializer、StringSerializer和IntegerSerializer,因此,如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意,key.serializer是必须设置的,就算你打算只发送值内容。 |
| value. serializer | 与 key.serializer一样,value. serializer 指 定的类会将值序列 化。如 果键 和值都是字 符串,可以使用与 key.serializer一 样的序列 化器。如 果键是整数类型而值是字符串那么需要使用不同的序列化器。 |
| acks |
acks
参数指
定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的
可能性有重要影响。
主参数有如下选项。
acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩愤,新的首领还没有被选举出来),生产者会一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1时更高,因为我们要等待不只一个服务器节点接收消息。 |
| buff er .m e m ory | 该参数 用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果 应用程序发送消息 的速度超过发送到服务器的速度,会导致生产者空间不足。这个 候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffe . full 参数 。 |
| com pre s s i on . type | 默认情 况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip或lz4 ,它指定了 消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Goog 公司 发明, 它占用较少 CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可 以使用这种算法。 gzip 压缩算法一 般会占用较多的 ,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和 存储开 销,而这往往是向 Kafka 发送消息的瓶颈所在。 |
| retr ies | 生产者 从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况 下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错民。默认情况下,生产者会在每次重试之间等待 100ms ,不过可以通过 retry .backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试 时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不 过有些错误不是临时性错误,没办法通过重试来解决(比如“悄息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要 处理那些不可重试的错误或重试次数超出上限的情况。 |
| bat ch . s ize | 当有多 个消息需要被发送到同一 个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个 批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频 繁地发送消息,会增加一 些额外的开销。 |
| linger.ms | 该参数指定了生产者在发送批 次之前等待更多消息加入批次的时间 KafkaPr odu cer 会在批 次填满或 linger.ms 达到上限时 把批 次发送出去。默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一 个消息。把 linger.ms 设置成比0 大的 数, 让生产者在发送批 次之前等待 一会 儿,使更多的消息加入到这个批次 。虽然这样会增加延 迟,但也 会提升吞吐量(因为 次性发送更多的消息,每个消息的 开销就变 小了) |
| cl i ent.id | 该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指 标里。 |
| max.in.flight.requests.per . conn e ction | 该参数指定了生产者在收 到服务器晌应之前可以发送多少个消息。它的值越 高,就会占用 越多的内存,不过也 会提升吞吐量。 它设为1 可以保证消息是按照发送的顺序写入服务 器的,即使发生了重试。 |
| timeout.ms、 request.timeout.ms和 metadata. fetch.timeout.ms | request.timeout.ms 指定了生产者在发送数据时 等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时 等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要 么返回错误 (抛出异常或执行回调)。 timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配 一一 如果在指定时间内没有收到同步副本的确认,那么 broker 会返回一 个错误 |
| max . block . ms | 该参数指定了在调用 send () 方法或使用 partitionsFor()方法 获取元数据时 生产者的阻塞 时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就 会阻塞。在阻 塞时间达到 max.block.ms 时,生产者会抛出超 异常。 |
| max . request.size | 该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB ,那么可以发送的单个最 大消 息为 1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每 个消息大小为1KB 。另外,b roker 对可接收的消息最大值也有自己的限制(message.max. bytes ),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝 |
| receive.buffer. bytes和 send. buff er.bytes | 这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小.。果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与 b roker 处于不同的数据中心,那么 可以适当 增大这些值,因为跨数据中心的网络一 般都有比较高的延迟和比较低的 带宽。 |
如何保证消息的顺序:
Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息, broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下 顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同。不过,有些场景对顺序不是很敏感。
如果把 reties 设为非零整数,同时把max.in.flight.requests.per.connection 设为比1大的数,那么,如果第1个批次消息写入失败,而第2个批次写入成功, broker 会重试写入第1个批次。如果此时第1个批次也写入成功,那么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把reties设为0。可以 max.in.flight.requests.per.connection 设为1,这样在生产者尝试发送第1批悄息时,就不会有其他的消息发送给 broker 。不过这样会严重影响生产者的吞吐量 ,所以只有在对消息的顺序有严格要求的情况下才能这么做。
Consumer配置
| 配置项 | 说明 |
| boot strap.servers | 指定了 Kafka 集群的连接 字符串, 参照producer配置 |
| key.deserializer | 使用指定的类把字节数组转成 Java 对象,参照producer配置 |
| value.deserializer | 使用指定的类把字节数组转成 Java 对象,参照producer配置 |
| group.id | 指定了 Kafk onsul'le 属于哪 个消费者群组。 |
| f e tc h . min.bytes | 该属性指定了消费者从服务器获取记录的最小字节数。b roker 在收到消费者的数据请求时, 如果可用的数据量小于 f e tc h . min.bytes 指定的大小,那么它会等到有足够的可用数据时 才把它返回给消费者。这样可以降低消费者和 bro ke 的工作负载,因为它们 在主题不是很活跃的时候(或者一 天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。 |
| fetc h . max . w a it.ms | f e tc h . min.bytes 告诉 Kafka ,等到有足够的数据时才把它返回给消费者。而 f etc h . max . w a it.ms 则用于指定 broker 的等待时间,默认是 500ms 。如果没有足够的数据流入 Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致 500m 延迟。 如果要降低 潜在 的延迟(为了满足 SLA ),可以把该参数值设置得小一 些。如果 fetc h . max . w a it.ms 被设为 100ms ,并且 f e tc h . min.bytes 被设为 1 MB ,那么 Kafk 在收到消费者的请求后,要么返 1MB 数据,要么在 100ms 后返回所有可用的数据 就看哪个条件先得到满足。 |
| max. par iti on. fe t ch. bytes | 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB。也就是说,KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。如果一个主题有 20 个分区和5个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll ()方法来避免会话过期和发生分区再均衡,如果单次调用 poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.parition.fetch.bytes 值改小,或者延长会话过期时间。 |
| session.timeout.ms | 该属性指定了消费者在被认为死亡之前可 以与服 务器断开连接的时间,默认是 3s 。如 果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者 。该属性与 hear tbeat.inter val.ms 紧密相关 hear tbeat.inter val.ms 指定了 poll () 方法向协调器发送 心跳的频 率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性, hear tbeat.inter val.ms 必须比 session.timeout.ms 小,一 般是 session.timeout.ms 的三 分之一 。如果 session.timeout.ms 是 3s ,那么 hear tbeat.inter val.ms 应该是1s。把 session.timeout.ms 值设 得比默认值小,可以更快地检测和恢复崩溃的 节点,不过长时间的轮询或垃圾 收集可能导致 非预期的再均衡。把该属性的值设置得大一 些,可 以减少 意外的再均衡 ,不过 检测节点崩溃需要更长的时间。 |
| auto.offset.reset | 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长 时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest 意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者 启动之 后生成的记录)。另一个值是 ear liest ,意思是说,在偏移 量无效的情况下 ,消费者将从起始位置读取分区的记录。 |
| enable . auto . commit | 该属性指定了消费者是否自动提交偏移 量,默认值是 tr ue 。为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由自己控制何 时提 交偏移量。如果把它设为 tr ue ,还 可以通过配置 auto.commit.interval.ms 属性来控制提交 的频 率。 |
| partition.assignment.strategy |
我们知道,分区会被分配给群组里的消费者。 PartitionAssignor
根据给定的消费者和主
题,决定哪些分区应该被分配给哪个消费者。
Kafka
有两个默认的分配策略:
Range:该策略会把主题的若干个连续的分区分配给消费者。假设悄费者 C1 和消费者 C2 同时订阅了主题 Tl 和主题 T2 ,井且每个主题有3个分区。那么消费者 Cl 有可能分配到这两个主题的分区 0 和分区1 ,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无怯被消费者数量整除,就会出现这种情况。RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 Cl 和消费者 C2 分配分区,那么消费者 C1 将分到主题 Tl 的分区 0 和分区 2 以及主题 T2的分区 1,消费者 C2 将分配到主题 Tl 分区 1以及主题T2的分区 0 分区 2。一般来说 ,如果所有消费者都订阅相同的主题(这种情况很常见) , RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差 个分区)。 通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是org.apache.kafka. clients. consumer.RangeAssignor , 这个类实现了 Ran ge 策略,不过也可以 把它改成 org.apache.kafka.clients.consumer .RoundRobinAssignor 。我们 还可以使用自定 义策略,在这种情况下 partition.assignment.strategy 属性的值就是自定义类的名字。 |
| clie nt.id | 该属性可 以是任意 字符串 broker 用它来标识从客户端发送过来的消息,通常被用在日志、 度量指标和配额里。 |
| ma x.poll.records | 该属性用于控制单次调用 call () 方法能够返回的记录数量,可以帮你控制在轮询里需要处 理的数据量。 |
| receive.buff er.bytes和 send.buff er.bytes | socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1 ,就使用操 作系统的默认值。如果生产者或消 费者与 broker 处于不同的数据中心内,可以适当增大这 些值,因为跨数据中心的网络一般都有 比较高的延迟和比较低的带宽 |
kafka服务器优化
影响Kafka整体性能的因素一般有:磁盘吞吐量和容量、 内存、网络和 CPU。磁盘性能影响生产者 ,而内存影响消费者。
磁盘吞吐量和容量:生产者生成的消息必须被提交到服务器保存,大多数客户端在发送消息之后会一直等待,直到至少有一个服务器确认悄息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。固态硬盘的查找和访问速度都很快,提供了最好的性能。机械硬盘更便宜,单块硬盘容量也更大。在同一个服务器上使用多个机械硬盘,可以设置多个数据目录,或者把它们设置成磁盘阵列,这样可以提升机械硬盘的性能。磁盘容量取决于需要保留的消息数量、保留消息的策略和集群复制策。例如:每天会收到 1TB 消息,并且保留7天,那么就需要7TB 的存储空间,而且还要为其他文件提供至少 10 的额外空间。除此之外,还需要提供缓冲区,用于应付消息流量的增长和波动。通过让主题拥有多个分区,集群的总流量可以被均衡到整个集群,而且如果单 broker 无战支撑全部容量,可以让其 broker 提供可用的容量。因此,是选择传统的机械硬盘(HDD )还是固态硬盘(SSD ),可以根据存储数据量和预算很容易地作出决定。
除了选择合适的磁盘硬件设备和使用 RAID 外,文件系统是影响性能的另一个重要因素。XFT为Kafka 提供了更好的性能,除了由文件系统提供的自动调优之外,无需额外的调优。批量磁盘写入具有更高的效率,可以提升整体的 I/O 吞吐量。XFT成为很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负荷,比EXT4具有更好的表现。其次,对文件挂载点的 noatime 参数进行合理的设置,以降低磁盘的写操作,也可提升kafka性能。文件元数据包含3个时间戳:创建时间(ctime )、最后修改时间(mtime)以及最后访问时间(atime)。默认情况下,每次文件被读取后都会更新 atime,这会导致大量的磁盘写操作,而且 atime 属性的用处不大,除非某些应用程序想要知道某个文件在最近一次修改后有没有被访问过(这种情况可以使用 realtime )。 kafka 用不到该属性,所以完全可以把它禁用掉。为挂载点设置 noatime 参数可以防止更新atime,但不会影响 ctime和mtime。
内存:消费者一般从分区尾部读取消息,如果有生产者存在,就紧跟在生产者后面。在这种情况下,消费者读取的消息会直接存放在系统的页面缓存里,这比从磁盘上重新读取要快得多。kafka大量地使用系统页面缓存,如果虚拟内存被交换到磁盘,说明已经没有多余内存可以分配给页面缓存了。一般来说,Linux的虚拟内存会根据系统的工作负荷进行自动调整。我们可以对交换分的处理方式和内存脏页进行调整,从而让 Kafka 更好地处理工作负载。建议把 vm.swappines 参数的值设置得小一点,比如1。该参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减小页面缓存,而不是进行内存交换, 0意味着“在任何情况下都不要发生交换”。
Kafka 依赖 I/O性能为生产者提供快速的响应,通过将 vm.dirty_backgroud_ratio 设为小于 10 的值,可以减少脏页的数量。vm.dirty_ratio 参数可以增加被内核进程刷新到磁盘之前的脏页数量,可以将它设为大于 20 的值(这也是系统内存的百分比)。这个值可设置的范围很广, 60~80 是个比较合理的区间。不过调整这个参数会带来一些风险,包括未刷新磁盘操作的数量和同步刷新引起的长时间 I/O 等待。如果该参数设置了较高的值,建议启用 Kafka 的复制功能,避免因系统崩溃造成数据丢失。调整内核对脏页的处理方式可以让我们从中获益。
可以在 /proc/vmstat 文件里查看当前脏页数量:
# cat /proc/vmstat | egrep “dirty|writeback”
网络:一般需要对 linux 系统的网络栈进行调优,以实现对大流量的支持。首先可以对分配给 socket读写缓冲区的内存大小作出调整,这样可以显著提升网络的传输性能。 socket 读写缓冲区对应的参数分别是 net.core.wmem_default和net.core.rmem_default,合理的值是 131072 (也就是 28 KB )。读写缓冲区最大值对应的参数分别是net.core.wmem_mx和net.core.rmem_mx ,合理的值是 2097152 (也就是2MB )。要注意,最大值并不意味着每个 socket 一定要有这么大的缓冲空间,只是说在必要的情况下才会达到这个值。
除了设置 socket 外,还需要设置 TCP socket 的读写缓冲区,它们的参数分别是net.ipv4.tcp_wmem和net.ipv4.tcp_rmem。这些参数的值由3个整数组成,它们使用空格分隔,分别表示最小值、默认值和最大值。最大值不能大于net.core.wmem_mx和net.core.rmem_mx指定的大小。例如,“4096 65536 204800 ”表示最小值是4KB 、默认值是64KB、最大值2M 。根据 Kafka 服务器接收流量的实际情况,可能需要设置更高的最大值,为网络连接提供更大的缓冲空间。
还有其它一些有用的网络参数。例如,net.ipv4.tcp_window_scaling设为 1,启用 TCP时间窗扩展,可以提升客户端传输数据的效率,传输的数据可以在服务器端进行缓冲。把net.ipv4.tcp_max_syn_backlog设为比默认值 1024 更大的值,可以接受更多的井发连接net.cor.netdev_max_backlog设为比默认值 1000 更大的值,有助于应对网络流量的爆发,特别是在使用千兆网络的情况下,允许更多的数据包排队等待内核处理。
JVM:Kafka 堆内存的使用率非常高,容易产生垃坡对象,因此调整 Java 垃圾回收参数也能提升kafka性能。Java G1 会自动根据工作负载情况进行自我调节,而且它的停顿时间是恒定的。它可以轻松地处理大块的堆内存,把堆内存分为若干小块的区域,每次停顿时井不会对整个堆空间进行回收。正常情况下, G1 只需要很少的配置就能完成这些工作。例如:一台服务器有 64GB 存,并且使用 5GB 堆内存来运行 Kafka ,那么可以参考以下的配置:MAXGCPauseMillis可以设为 20 mx;InitiatingHeapOccupancyPerent可以设为 35 ,这样可以让垃圾回收比默认的要早一些启动。
G1 的两个调整参数
MAXGCPauseMillis:该参数指定每次垃圾回收默认的停顿时间。该值不是固定的, G1 可以根据需要使用更长的时间。它的默认值是 200ms 。也就是说, G1 会决定垃圾回收的频率以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要 200ms 的时间。
InitiatingHeapOccupancyPerent:该参数指定了在 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是 45 。也就是说,在堆内存的使用率达到 45% 之前, Gl 不会启动垃圾回收。这个百分比包括新生代和老年代的内存。
Kafka 启动脚本井没有启用G1回收器,而是使用了 Parall New 和CMS ( Concurrent Mark-Sw ,并发标记和清除)垃圾回收器。不过它可以通过环境变量来修改。
#export JAVA_HOME=/usr /java / jdkl.8 .0_51
#export KAFKA_JVM_PERFORMANCE_OPTS ”-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true”



