协议基本数据类型
固定宽度类型变长数据类型数组类型 请求响应结构
请求格式响应格式
协议基本数据类型 固定宽度类型变长数据类型int8, int16, int32, int64 , 采用大端字节序
数组类型bytes, string, 用一个整型来存储字节长度, 加上该长度的字节数据来表示。 整型值为-1表示null, 字符串用int16来表示长度, 字节数据由int32来表示长度
请求响应结构长度用一个int32类型数据表示, 加上该长度的其他类型数据的数组
RequestOrResponse => Size (RequestMessage | ResponseMessage)
Size => int32
Size - 一个4字节整数, 来表示一个字节长度, 后面跟随该长度的字节数据, 来表示请求响应的消息
用wireshark抓下包看看
kafka用的2.11版本, java客户端用的是2.7的版本
因为我们使用的kafka端口是9093, 所以配置抓取过滤为 tcp port 9093
然后启动我们的应用, 应用代码很简单, 自行编写, 就是producer发送一个字符串, consumer过5秒拉取并打印
显示过滤配置 tcp.flags & 0x002来过滤出syn包, 这样可以查看建立几次连接, 方便追踪tcp数据流
这里要注意下, wireshark中配置的kafka协议解析端口是9092, 如果我们的kafka服务端口不是9092, 要修改
编辑->首选项->protocols->找到kafka协议->tcp ports里加上你的kafka端口号, “,”分隔
这样配置后, wireshark就会自动解析你的kafka协议,, 方便分析
我们追踪第一个tcp流数据看下
我们可以看到三次握手后, 客户端会发送一个request请求
我们看下请求内容
请求格式可以看到, 开始是MessageSize, 4个字节, 值是0x14, 表示后面的message字节长度为20个
apikey占2个字节, 值是0x12(18), 表示ApiVersions请求
api version 占2个字节, 版本号0 ,这个是ApiVersions的版本号
CorrelationId 占4个字节
ClientId 占12个字节, 内容是producer-1, 前面说过, 字符串是个数组类型, 需要一个unit16来表示长度,加上 producer-1字节长度为10, 总共长度12字节
最后可以看出整个message内容长度为20字节
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => metadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
| 字段名 | 说明 |
|---|---|
| ApiKey | 一个数字id, 表示是哪种请求(meta请求, 生产请求, 拉取请求等) |
| ApiVersion | 表示这个api的版本号, 会关系到响应格式 |
| CorrelationId | 用户提供的整数数据, 服务器不会修改, 直接通过响应传回。 |
| ClientId | 这是用户提供的客户端id。 |
Response => CorrelationId ResponseMessage CorrelationId => int32 ResponseMessage => metadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
| 字段名 | 说明 |
|---|---|
| CorrelationId | 服务端回传客户端请求发过来的CorrelationId内容 |
再用wireshark看下请求响应的包
这个是请求协议数据, 从蓝底字开始分析字节数据内容
4个字节(蓝底部分) —— 请求消息体长度为20字节
2个字节 —— api key表示请求消息类型, 18表示请求获取服务端支持的api 版本号明细
2个字节 —— api版本号, 使用的是0版本
4个字节 —— 客户端提供, 服务端会通过响应消息体传回该数据
12个字节 —— 前俩个字节是一个unit16的数据类型, 表示字符串长度是10个字节, 后面10个字节表示字符串字节编码内容
这个是响应的协议数据, 第一个图是解析后的内容 , 包含了服务端支持的共43个API协议
接下来分析第二个图中的字节数据含义, 从蓝底字节开始
4个字节 —— 响应体长度, 268个字节长度
4个字节 —— 之前请求消息中的correlationId值, 通过响应消息这个字段传回来了
2个字节 —— 状态码, errorcode, 0表示没出错
4个字节 —— unit32类型, 代表接下来的字节数组长度, 这里是表示数据元素的个数, 不是字节长度, 要注意, 0x2b值表示字节数组元素个数有43个
接下来循环43次,每6个字节代表一个api version信息对象, 这里可能有人有疑惑, 怎么确定的这6个字节代表一个api version对象呢? 这是根据前面请求消息体中api key 和api version来确定的, 之前咱们有提到过, 会关系到响应体的格式, correlationId可以确定是哪个请求的响应。
那我们再看下这6个字节又有什么含义呢
前2个字节表示api key值, 用来区分是哪种消息类型
中间2个字节表示服务端支持的最小版本号
后2个字节表示服务端支持的最大版本号
现在通过抓包分析, 大家应该对kafka协议的请求响应消息格式有了更清楚直观的认识, 接下来我们再学习kafka协议的其他内容。



