在服务端的介绍中,首先了解下kafka协议。掌握协议不会对日常使用有太大的帮助,但是做到心中有数可以帮助我更好的理解kafka整体的运行策略。
kafka中包含众多协议类型,每种类型的协议都包含请求协议Request和响应协议Response,且所有的请求协议都包相同的协议请求头RequestHeader和不同结果的协议请求体RequestBody,同理,响应协议亦是如此。下面以消息发送协议对和消息拉取协议对为例,说明kafka协议的具体实现
消息发送 ProducerRequest所有请求协议都包含相同的协议头,以ProducerRequest为例:
协议头都包含4个字段:
- api_key: API标识,表明当前协议类型,如PRODUCE、FETCH分别标识发送消息和拉取消息,int16
- 在ProducerRequest中,api_key = "PRODUCE"
- api_version: API协议版本,int16
- 以V6为例, api_version = 6
- correlation_id: 请求唯一id。由客户端指定,作为此次请求的标识, int32
- 该唯一id会作为响应协议的协议请求头返回客户端,客户端以此来识别响应
- client_id: 客户端id, string
协议体包含多个字段:
- transaction_id: 事务id. 在kafka事务中起作用,string
- acks: 表明此次发送消息的ack类型,-1、0、1
- timeout: 请求超时时间。表明此次请求的超时时间,在request.timeout.ms配置项中设置,默认30s
- topic_data: 发送的消息数据,该字段是一个array类型,表示可以发送多个topic消息,数组中每项为一个topic的数据
- topic: 主题名,string
- data: 数据字段也是一个数组array类型,数组每项为一个partition的数据
- partition: 分区编号,int32
- record_set: 分区对应的消息集合,包含多个records数据
客户端在消息发出之前,先把要发送的数据进行归类整理成如上格式,那么服务端就可以以分区为粒度直接进行数据追加。这样可以分散计算压力给客户端(不用集中在服务端处理),提升服务端性能。
ProducerResponse服务端接收到发送请求之后,按照这种协议方式进行数据解析、处理、落盘等操作,就会给客户端响应数据,即发送消息响应:
协议头中仅包含请求协议中的correlation_id,用于客户端拿到响应后知道是哪个请求的响应
响应体包含多个字段:
- throttle_time_ms
- responses: 响应的具体信息,同样是数据类型,每个元素表示一个topic的响应数据
- topic: 主题名称
- partition_responses: 主题中所有的分区响应信息,同样是数组类型,每个元素表示一个分区的响应细节
- partition: 分区号
- error_code: 该分区响应的错误码
- base_offset: 消息集的起始偏移量;由于发送消息集中一个partition发送了多条消息,这里返回消息中的最小偏移量
- log_append_time: 消息写入broker的时间
- log_start_offset: 所在分区的起始偏移量
发送消息协议的响应数据同样是以分区为粒度返回结果
控制器kafka控制器是kafka集群中的某一个broker。该broker被选举为控制器kafka controller后,需要监听整个集群中分区和副本的状态变化,以及保证集群中各节点元数据的更新。比如:
- 当集群中的topic新增或者减少时,控制器需要通知整个集群中的各个节点
- 当集群中的某个分区的leader副本下线时,控制器需要发起该分区的leader副本选举
- 当某个分区的ISR发生变化时,控制器需要将该元信息同步给其他broker节点
等等等等。。。
控制器选举kafka的元数据信息依赖zookeeper,控制器的信息会保存在zookeeper服务器上/controller文件中,如{"version": 1, "brokerid": 0, "timestamp": "xxx"},分别记录控制器的版本、服务节点id、成为控制器的时间戳信息
每个broker启动时会尝试去读取zookeeper的/controller节点信息,如果读到,则表示此时已有控制系统,如果读不到,则会申请成为控制器,当多个broker同时申请成为控制器时,只有一个节点会申请成功,其他均失败。一个集群中任意时刻有且仅有一个控制器
控制器的实现方式kafka的元数据信息都保存在zookeeper上,因此,控制器通过监听zookeepe各节点的变化实现其控制能力。比如在zookeeper中:
- /brokers/topics节点用来保存集群的分区信息,因此控制器需要在此节点上注册handler,发现节点信息变化时快速响应
- /brokers/ids节点可以监听broker变化
- /isr_change_notification节点可以监听ISR节后的变化



