栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Day550.kafka源码主内容解析 -kafka

Day550.kafka源码主内容解析 -kafka

kafka源码主内容解析 一、源码环境准备

源码下载地址


安装 JDK&Scala:

需要在 Windows本地安装 JDK 8或者 JDK8 以上版本。
需要在 Windows本地安装 Scala2.12。

加载源码:

将 kafka-3.0.0-src.tgz源码包,解压到非中文目录。

例如:D:1_softwarekafka-3.0.0-src。

打开 IDEA,点击 File->Open…->源码包解压的位置。


安装 gradle:
Gradle是类似于 maven 的代码管理工具。安卓程序管理通常采用 Gradle。

IDEA 自动帮你下载安装,下载的时间比较长(网络慢,需要 1 天时间,有 VPN 需要几分钟)。


二、生产者源码

1、初始化

生产者sender线程初始化:

程序入口:

从用户自己编写的 main 方法开始阅读

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
	public static void main(String[] args) {
	// 0 配置
	Properties properties = new Properties();
	// 连接集群 bootstrap.servers
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
	:9092,hadoop103:9092");
	// 指定对应的 key 和 value 的序列化类型 key.serializer
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Strin
	gSerializer.class.getName());
	// 1 创建 kafka 生产者对象
	// "" hello
	KafkaProducer  kafkaProducer  =  new
	KafkaProducer<>(properties);
	// 2 发送数据
	for (int i = 0; i < 5; i++) {
	kafkaProducer.send(new
	ProducerRecord<>("first","atguigu"+i));
	}
	// 3 关闭资源
	kafkaProducer.close();
	}
}

生产者 main 线程初始化:

点击 main()方法中的 KafkaProducer()。

KafkaProducer.java:

跳转到 KafkaProducer构造方法:







生产者 sender 线程初始化:





发送数据到缓冲区:

发送总体流程:





分区选择:


发送消息 大小 校验:


内存池:


sender 线程 发送 数据:










三、消费者源码

消费者组初始化流程:

消费者组详细消费流程:

消费者初始化:

程序入口:


消费者初始化:





消费者订阅主题:



消费者拉取 和处理 数据:

消费 总体流程:

消费者/ 消费者组初始化:



拉取数据:

发送请求并抓取数据:




把数据按照分区封装好后,一次处理 最大条数认 默认 500 :



拦截器处理数据:

消费者 Offset 提交:

手动同步提交 Offset:



手动异步提交 Offset:



四、服务器源码

Kafka Broker总体工作流程:

程序入口:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752773.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号