源码下载地址
安装 JDK&Scala:
需要在 Windows本地安装 JDK 8或者 JDK8 以上版本。
需要在 Windows本地安装 Scala2.12。
加载源码:
将 kafka-3.0.0-src.tgz源码包,解压到非中文目录。
例如:D: 1_softwarekafka-3.0.0-src。
打开 IDEA,点击 File->Open…->源码包解压的位置。
安装 gradle:
Gradle是类似于 maven 的代码管理工具。安卓程序管理通常采用 Gradle。
IDEA 自动帮你下载安装,下载的时间比较长(网络慢,需要 1 天时间,有 VPN 需要几分钟)。
二、生产者源码 1、初始化
生产者sender线程初始化:
程序入口:
从用户自己编写的 main 方法开始阅读
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092,hadoop103:9092");
// 指定对应的 key 和 value 的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Strin
gSerializer.class.getName());
// 1 创建 kafka 生产者对象
// "" hello
KafkaProducer kafkaProducer = new
KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","atguigu"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
生产者 main 线程初始化:
点击 main()方法中的 KafkaProducer()。
KafkaProducer.java:
跳转到 KafkaProducer构造方法:
生产者 sender 线程初始化:
发送数据到缓冲区:
发送总体流程:
分区选择:
发送消息 大小 校验:
内存池:
sender 线程 发送 数据:
三、消费者源码
消费者组初始化流程:
消费者组详细消费流程:
消费者初始化:
程序入口:
消费者初始化:
消费者订阅主题:
消费者拉取 和处理 数据:
消费 总体流程:
消费者/ 消费者组初始化:
拉取数据:
发送请求并抓取数据:
把数据按照分区封装好后,一次处理 最大条数认 默认 500 :
拦截器处理数据:
消费者 Offset 提交:
手动同步提交 Offset:
手动异步提交 Offset:
四、服务器源码
Kafka Broker总体工作流程:
程序入口:



