如果 broker 端没有显式配置 listeners(或 advertised.listeners)使用 IP 地址,那么最好将 bootstrap.server 配置成主机名而不要使用 IP 地址,因为 Kafka 内部使用的是全称域名(Fully Qualified Domain Name)。如果不统一,则会出现无法获取元数据的异常。
broker.idbroker.id 是 broker 在启动之前必须设定的参数之一,在 Kafka 集群中,每个 broker 都有唯一的 id(也可以记作 brokerId)值用来区分彼此。broker 在启动时会在 ZooKeeper 中的 /brokers/ids 路径下创建一个以当前 brokerId 为名称的虚节点,broker 的健康状态检查就依赖于此虚节点。当 broker 下线时,该虚节点会自动删除,其他 broker 节点或客户端通过判断 /brokers/ids 路径下是否有此 broker 的 brokerId 节点来确定该 broker 的健康状态。
可以通过 broker 端的配置文件 config/server.properties 里的 broker.id 参数来配置 brokerId,默认情况下 broker.id 值为-1。在 Kafka 中,brokerId 值必须大于等于0才有可能正常启动,但这里并不是只能通过配置文件 config/server.properties 来设定这个值,还可以通过 meta.properties 文件或自动生成功能来实现。
首先了解一下 meta.properties 文件,meta.properties 文件中的内容参考如下:
#Sun May 27 23:03:04 CST 2018 version=0 broker.id=0
meta.properties 文件中记录了与当前 Kafka 版本对应的一个 version 字段,不过目前只有一个为0的固定值。还有一个 broker.id,即 brokerId 值。broker 在成功启动之后在每个日志根目录下都会有一个 meta.properties 文件。 meta.properties 文件与 broker.id 的关联如下:
- 如果 log.dir 或 log.dirs 中配置了多个日志根目录,这些日志根目录中的 meta.properties 文件所配置的 broker.id 不一致则会抛出 InconsistentBrokerIdException 的异常。
- 如果 config/server.properties 配置文件里配置的 broker.id 的值和 meta.properties 文件里的 broker.id 值不一致,那么同样会抛出 InconsistentBrokerIdException 的异常。
- 如果 config/server.properties 配置文件中并未配置 broker.id 的值,那么就以 meta.properties 文件中的 broker.id 值为准。
- 如果没有 meta.properties 文件,那么在获取合适的 broker.id 值之后会创建一个新的 meta.properties 文件并将 broker.id 值存入其中。
如果 config/server.properties 配置文件中并未配置 broker.id,并且日志根目录中也没有任何 meta.properties 文件(比如第一次启动时),那么应该如何处理呢?
Kafka 还提供了另外两个 broker 端参数:broker.id.generation.enable 和 reserved.broker.max.id 来配合生成新的 brokerId。broker.id.generation.enable 参数用来配置是否开启自动生成 brokerId 的功能,默认情况下为 true,即开启此功能。自动生成的 brokerId 有一个基准值,即自动生成的 brokerId 必须超过这个基准值,这个基准值通过 reserverd.broker.max.id 参数配置,默认值为1000。也就是说,默认情况下自动生成的 brokerId 从1001开始。
自动生成的 brokerId 的原理是先往 ZooKeeper 中的 /brokers/seqid 节点中写入一个空字符串,然后获取返回的 Stat 信息中的 version 值,进而将 version 的值和 reserved.broker.max.id 参数配置的值相加。先往节点中写入数据再获取 Stat 信息,这样可以确保返回的 version 值大于0,进而就可以确保生成的 brokerId 值大于 reserved.broker.max.id 参数配置的值,符合非自动生成的 broker.id 的值在 [0, reserved.broker.max.id] 区间设定。
初始化时 ZooKeeper 中 /brokers/seqid 节点的状态如下:
[zk: xxx.xxx.xxx.xxx:2181/kafka(CONNECTED) 6] get /brokers/seqid null cZxid = 0x200001b2b ctime = Mon Nov 13 17:39:54 CST 2018 mZxid = 0x200001b2b mtime = Mon Nov 13 17:39:54 CST 2018 pZxid = 0x200001b2b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 0
可以看到 dataVersion=0,这个就是前面所说的 version。在插入一个空字符串之后,dataVersion 就自增1,表示数据发生了变更,这样通过 ZooKeeper 的这个功能来实现集群层面的序号递增,整体上相当于一个发号器。
[zk: xxx.xxx.xxx.xxx:2181/kafka(CONNECTED) 7] set /brokers/seqid "" cZxid = 0x200001b2b ctime = Mon Nov 13 17:39:54 CST 2017 mZxid = 0x2000e6eb2 mtime = Mon May 28 18:19:03 CST 2018 pZxid = 0x200001b2b cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 2 numChildren = 0
大多数情况下我们一般通过并且习惯于用最普通的 config/server.properties 配置文件的方式来设定 brokerId 的值,如果知晓其中的细枝末节,那么在遇到诸如 InconsistentBrokerIdException 异常时就可以处理得游刃有余,也可以通过自动生成 brokerId 的功能来实现一些另类的功能。
bootstrap.serversbootstrap.servers 不仅是 Kafka Producer、Kafka Consumer 客户端中的必备参数,而且在 Kafka Connect、Kafka Streams 和 KafkaAdminClient 中都有涉及,是一个至关重要的参数。
如果你使用过旧版的生产者或旧版的消费者客户端,那么你可能还会对 bootstrap.servers 相关的另外两个参数 metada.broker.list 和 zookeeper.connect 有些许印象,这3个参数也见证了 Kafka 的升级变迁。
我们一般可以简单地认为 bootstrap.servers 这个参数所要指定的就是将要连接的 Kafka 集群的 broker 地址列表。不过从深层次的意义上来讲,这个参数配置的是用来发现 Kafka 集群元数据信息的服务地址。为了更加形象地说明问题,我们先来看一下下图。
客户端 KafkaProducer1 与 Kafka Cluster 直连,这是客户端给我们的既定印象,而事实上客户端连接 Kafka 集群要经历以下3个过程,如上图中的右边所示。
- 客户端 KafkaProducer2 与 bootstrap.servers 参数所指定的 Server 连接,并发送 metadataRequest 请求来获取集群的元数据信息。
- Server 在收到 metadataRequest 请求之后,返回 metadataResponse 给 KafkaProducer2,在 metadataResponse 中包含了集群的元数据信息。
- 客户端 KafkaProducer2 收到的 metadataResponse 之后解析出其中包含的集群元数据信息,然后与集群中的各个节点建立连接,之后就可以发送消息了。
在绝大多数情况下,Kafka 本身就扮演着第一步和第二步中的 Server 角色,我们完全可以将这个 Server 的角色从 Kafka 中剥离出来。我们可以在这个 Server 的角色上大做文章,比如添加一些路由的功能、负载均衡的功能。
下面演示如何将 Server 的角色与 Kafka 分开。默认情况下,客户端从 Kafka 中的某个节点来拉取集群的元数据信息,我们可以将所拉取的元数据信息复制一份存放到 Server 中,然后对外提供这份副本的内容信息。
由此可见,我们首先需要做的就是获取集群信息的副本,可以在 Kafka 的 org.apache.kafka.common.request.metadataResponse 的构造函数中嵌入代码来复制信息,metadataResponse 的构造函数如下所示。
public metadataResponse(int throttleTimeMs, Listbrokers, String clusterId, int controllerId, List topicmetadata) { this.throttleTimeMs = throttleTimeMs; this.brokers = brokers; this.controller = getControllerNode(controllerId, brokers); this.topicmetadata = topicmetadata; this.clusterId = clusterId; //客户端在获取集群的元数据之后会调用这个构造函数,所以在这里嵌入代码将5个成 //员变量的值保存起来,为后面的Server提供内容 }
获取集群元数据的副本之后,我们就可以实现一个服务程序来接收 metadataRequest 请求和返回 metadataResponse,从零开始构建一个这样的服务程序也需要不少的工作量,需要实现对 metadataRequest 与 metadataResponse 相关协议解析和包装,这里不妨再修改一下 Kafka 的代码,让其只提供 Server 相关的内容。整个示例的架构如下图所示。
为了演示方便,上图中的 Kafka Cluster1 和 Kafka Cluster2 都只包含一个 broker 节点。Kafka Cluster1 扮演的是 Server 的角色,下面我们修改它的代码让其返回 Kafka Cluster2 的集群元数据信息。假设我们已经通过前面一步的操作获取了 Kafka Cluster2 的集群元数据信息,在 Kafka Cluster1 中将这份副本回放。
在 Kafka 的代码 kafka.server.KafkaApis 中有关专门处理元数据信息的方法如下所示。
def handleTopicmetadataRequest(request: RequestChannel.Request)
我们将这个方法内部的最后一段代码替换,详情如下:
//sendResponseMaybeThrottle(request, requestThrottleMs => // new metadataResponse( // requestThrottleMs, // brokers.flatMap(_.getNode(request.context.listenerName)).asJava, // clusterId, // metadataCache.getControllerId // .getOrElse(metadataResponse.NO_CONTROLLER_ID), // completeTopicmetadata.asJava // )) sendResponseMaybeThrottle(request, requestThrottleMs => BootstrapServerParam.getmetadata(requestThrottleMs) )
上面示例代码中有“//”注释的是原本的代码实现,没有“//”注释的两行代码是我们修改后的代码实现,代码里的 BootstrapServerParam.getmetadata() 方法也是需要自定义实现的,这个方法返回的就是从 Kafka Cluster2 中获取的元数据信息的副本回放,BootstrapServerParam 的实现如下:
public class BootstrapServerParam {
public static final String topic = "topic-demo";
public static metadataResponse getmetadata(int throttleTimeMs){
Node node = new Node(0, "localhost", 9093);
List brokers = Collections.singletonList(node);
int controllerId = 0;
String clusterId = "64PniqfkRHa4ASfUisNXrw";
List empty = new ArrayList<>();
Partitionmetadata pmeta1 = new
Partitionmetadata(Errors.NONE, 0, node, brokers, brokers, empty);
Partitionmetadata pmeta2 = new
Partitionmetadata(Errors.NONE, 1, node, brokers, brokers, empty);
Partitionmetadata pmeta3 = new
Partitionmetadata(Errors.NONE, 2, node, brokers, brokers, empty);
Partitionmetadata pmeta4 = new
Partitionmetadata(Errors.NONE, 3, node, brokers, brokers, empty);
List pmetaList = new ArrayList<>();
pmetaList.add(pmeta1);
pmetaList.add(pmeta2);
pmetaList.add(pmeta3);
pmetaList.add(pmeta4);
Topicmetadata tmeta1 = new
Topicmetadata(Errors.NONE, topic, false, pmetaList);
List tmetaList = new ArrayList<>();
tmetaList.add(tmeta1);
return new metadataResponse(throttleTimeMs, brokers,
clusterId, controllerId, tmetaList);
}
}
示例代码中用了最笨的方法来创建了一个 metadataResponse,如果我们在复制 Kafka Cluster2 元数据信息的时候使用了某种序列化手段,那么在这里我们就简单地执行一下反序列化来创建一个 metadataResponse对象。
修改完 Kafka Cluster1 的代码之后我们将它和 Kafka Cluster2 都启动起来,然后创建一个生产者 KafkaProducer 来持续发送消息,这个 KafkaProducer 中的 bootstrap.servers 参数配置为 Kafka Cluster1 的服务地址。我们再创建一个消费者 KafkaConsumer 来持续消费消息,这个 KafkaConsumer 中的 bootstrap.servers 参数配置为 Kafka Cluster2 的服务地址。
实验证明,KafkaProducer 中发送的消息都流入 Kafka Cluster2 并被 KafkaConsumer 消费。查看 Kafka Cluster1 中的日志文件,发现并没有消息流入。如果此时我们再关闭 Kafka Cluster1 的服务,会发现 KafkaProducer 和 KafkaConsumer 都运行完好,已经完全没有 Kafka Cluster1 的任何事情了。
这里只是为了讲解 bootstrap.servers 参数所代表的真正含义而做的一些示例演示,笔者并不建议在真实应用中像示例中的一样分离出 Server 的角色。
在旧版的生产者客户端(Scala 版本)中还没有 bootstrap.servers 这个参数,与此对应的是 metadata.broker.list 参数。metadata.broker.list 这个参数很直观,metadata 表示元数据,broker.list 表示 broker 的地址列表,从取名我们可以看出这个参数很直接地表示所要连接的 Kafka broker 的地址,以此获取元数据。而新版的生产者客户端中的 bootstrap.servers 参数的取名显然更有内涵,可以直观地翻译为“引导程序的服务地址”,这样在取名上就多了一层“代理”的空间,让人可以遐想出 Server 角色与 Kafka 分离的可能。在旧版的消费者客户端(Scala 版本)中也没有 bootstrap.servers 这个参数,与此对应的是 zookeeper.connect 参数,意为通过 ZooKeeper 来建立消费连接。
很多读者从 0.8.x 版本开始沿用到现在的 2.0.0 版本,对于版本变迁的客户端中出现的 bootstrap.servers、metadata.broker.list、zookeeper.connect 参数往往不是很清楚。这一现象还存在 Kafka 所提供的诸多脚本之中,在这些脚本中连接 Kafka 采用的选项参数有 --bootstrap-server、--broker-list 和 --zookeeper(分别与前面的3个参数对应),这让很多 Kafka 的老手也很难分辨哪个脚本该用哪个选项参数。
--bootstrap-server 是一个逐渐盛行的选项参数,这一点毋庸置疑。而 --broker-list 已经被淘汰,但在 2.0.0 版本中还没有完全被摒弃,在 kafka-console-producer.sh 脚本中还是使用的这个选项参数,在后续的 Kafka 版本中可能会被替代为 --bootstrap-server。--zookeeper 这个选项参数也逐渐被替代,在目前的 2.0.0 版本中,kafka-console-consumer.sh 中已经完全没有了它的影子,但并不意味着这个参数在其他脚本中也被摒弃了。在 kafka-topics.sh 脚本中还是使用的 --zookeeper 这个选项参数,并且在未来的可期版本中也不见得会被替换,因为 kafka-topics.sh 脚本实际上操纵的就是 ZooKeeper 中的节点,而不是 Kafka 本身,它并没有被替代的必要。
服务端参数列表下表6列出了部分服务端重要参数。
| 参 数 名 称 | 默 认 值 | 参 数 释 义 |
|---|---|---|
| auto.create.topics.enable | true | 是否开启自动创建主题的功能 |
| auto.leader.rebalance.enable | true | 是否开始自动leader再均衡的功能 |
| background.threads | 10 | 指定执行后台任务的线程数 |
| compression.type | producer | 消息的压缩类型。Kafka支持的压缩类型有Gzip、Snappy、LZ4等。默认值“producer”表示根据生产者使用的压缩类型压缩,也就是说,生产者不管是否压缩消息,或者使用何种压缩方式都会被broker端继承。“uncompressed”表示不启用压缩 |
| delete.topic.enable | true | 是否可以删除主题 |
| leader.imbalance.check.interval.seconds | 300 | 检查leader是否分布不均衡的周期 |
| leader.imbalance.per.broker.percentage | 10 | 允许leader不均衡的比例,若超过这个值就会触发leader再均衡的操作(前提是auto.leader.rebalance.enable参数也要设定为true) |
| log.flush.interval.messages | 9223372036854775807(Long.MAX_VALUE) | 如果日志文件中的消息在存入磁盘前的数量达到这个参数所设定的阈值时,则会强制将这些刷新日志文件到磁盘中。消息在写入磁盘前还要经历一层操作系统页缓存,如果期间发生掉电,则这些页缓存中的消息会丢失,调小这个参数的大小会增大消息的可靠性,但也会降低系统的整体性能 |
| log.flush.interval.ms | null | 刷新日志文件的时间间隔。如果没有配置这个值,则会依据log.flush. scheduler.interval.ms参数设置的值来运作 |
| log.flush.scheduler.interval.ms | 9223372036854775807(Long.MAX_VALUE) | 检查日志文件是否需要刷新的时间间隔 |
| log.retention.bytes | -1 | 日志文件的最大保留大小(分区级别,注意与log.segment.bytes的区别) |
| log.retention.hours 168 | (7天) | 日志文件的留存时间,单位为小时 |
| log.retention.minutes | null | 日志文件的留存时间,单位为分钟 |
| log.retention.ms | null | 日志文件的留存时间,单位为毫秒。log.retention.{hours |
| log.roll.hours | 168(7天) | 经过多长时间之后会强制新建一个日志分段,默认值为7 |
| log.roll.ms | null | 同上,不过单位为毫秒。优先级比log.roll.hours要高 |
| log.segment.bytes | 1073741824(1GB) | 日志分段文件的最大值,超过这个值会强制创建一个新的日志分段 |
| log.segment.delete.delay.ms | 60000(60秒) | 从操作系统删除文件前的等待时间 |
| min.insync.replicas | 1 | ISR集合中最少的副本数 |
| num.io.threads | 8 | 处理请求的线程数,包含磁盘I/O |
| num.network.threads | 3 | 处理接收和返回响应的线程数 |
| log.cleaner.enable | true | 是否开启日志清理的功能 |
| log.cleaner.min.cleanable.ratio | 0.5 | 限定可执行清理操作的最小污浊率 |
| log.cleaner.threads | 1 | 用于日志清理的后台线程数 |
| log.cleanup.policy | delete | 日志清理策略,还有一个可选项为compact,表示日志压缩 |
| log.index.interval.bytes | 4096 | 每隔多少个字节的消息量写入就添加一条索引 |
| log.index.size.max.bytes | 10485760(10MB) | 索引文件的最大值 |
| log.message.format.version | 2.0-IV1 | 消息格式的版本 |
| log.message.timestamp.type | CreateTime | 消息中的时间戳类型,另一个可选项为LogAppendTime。CreateTime表示消息创建的时间,LogAppendTime表示消息追加到日志中的时间 |
| log.retention.check.interval.ms | 300000(5分钟) | 日志清理的检查周期 |
| num.partitions | 1 | 主题中默认的分区数 |
| reserved.broker.max.id | 1000 | broker.id能配置的最大值,同时reserved.broker.max.id+1也是自动创建broker.id值的起始大小,详细参考6.5.1节 |
| create.topic.policy.class.name | null | 创建主题时用来验证合法性的策略,这个参数配置的是一个类的全限定名,需要实现org.apache.kafka.server. policy.CreateTopicPolicy接口 |
| broker.id.generation.enable | true | 是否开启自动生成broker.id的功能 |
| broker.rack | null | 配置broker的机架信息 |
从第6节到这里主要讲解 Kafka 服务端的一些核心概念,方便读者加深对 Kafka 的理解,并且为下面章节中的讲解做好铺垫。比如涉及的协议设计,它可以帮助我们更好地理解组协调器和事务协调器的工作原理,还可以帮助我们理解 Kafka 中的数据可靠性、数据一致性等更深层次的理念。如果读者对Kafka的源码有兴趣,那么掌握好这些内容可以让你在阅读源码的时候事半功倍。



