栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka核心概念详解

Kafka核心概念详解

前言

本篇文章介绍KafKa的核心概念,其中包含了topic、生产者、消费者、leader选举的机制、消息如何进行分片选择,以及流式持久化存储原理,以及惊群效应产生及解决办法等

核心API

在java客户端要使用Kafka,我们最简单,也是最常用的,使用springboot提供的start,自动给我们管理,还有直接使用jar包的方式,当然最后还有一种是使用maven依赖去下载,其实总的来说都是为我们下载 kafka-clients  

 
    org.apache.kafka 
    kafka-clients 
    2.3.0 

而客户端中使用方式采用的 创建 删除 修改 topic 都是采用 AdminClient

Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.120.41:9092");

		try (AdminClient admin = AdminClient.create(props);) {
			admin.createTopics(Collections.singletonList(new NewTopic("test", 1, (short) 1)));
		}

核心的创建完topic就能进行生产和消费

然后配置这些 可配可不配,都有默认值的。

如果是生产者 则使用 KafkaProducer 进行发送 ,需要立即返回的,ProducerRecord

producer采用异步批量的方式来发送消息,send方法会立即返回。

	try (Producer producer = new KafkaProducer<>(props);) {
Future resultFuture = producer.send(new ProducerRecord("test", Integer.toString(i), message));

				// 如果你想要同步阻塞等待结果
				Recordmetadata rm = resultFuture.get();

如果是生产者 则是KafkaConsumer   采用拉的模式进行拉数据,默认就是异步的。

try (KafkaConsumer consumer = new KafkaConsumer<>(props);) {
	// kafka中是拉模式,poll的时间参数是告诉Kafka:如果当前没有数据,等待多久再响应
				ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));

对于消费者或者提供者的参数配置  可以按照下面官网中配置参数进行查找

阿帕奇·卡夫卡 (apache.org)

包括消息数据的序列化器   value.serializer  等等需要自定义则设置,

	// 消息数据的序列化器
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

以及提供者 中存放数据的大小一定是比批量发送大小大很多的,原理也是来自我们可以多线程发送的数据,与总的未发送数据内存之间的区别

// 存放数据的buffer的大小
		props.put("buffer.memory", 33554432);

// Producer是采用批量的方式来提高发送的吞吐量量的,这里指定批大小,单位字节
		props.put("batch.size", 16384);

订阅topic    并且拉数据 ,kafka中是拉模式,poll的时间参数是告诉Kafka:如果当前没有数据,等待多久再响应  设置 

consumer.poll(Duration.ofSeconds(1L));
Producer
  • KafkaProducer
  • ProducerRecord
  • ProducerConfifig
  • Serializer

对于上面的一些常见的配置,都是默认的,可以不用配置

Consumer
  • KafkaConsumer
  • ConsumerConfifig
  • ConsumerRecord
  • Deserializer

序列化和反序列化

序列化器

反序列化器 

核心概念 topic

在kafka中有 topic的概念,类似与 rabbitmq中通道的概念,但还是不一样的,也包含了类型的区分

[root@node4 latest]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic my-33-topic
[root@node4 latest]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-33-topic Topic:my-33-topic PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824 
    Topic: my-33-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 
    Topic: my-33-topic Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 
    Topic: my-33-topic Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
  • Topic :主题 ,一类消息(数据)
  • partition:一个Topic可以分成多个分片来分布式存放数据,分片以顺序号来编号。
  • Replicas:为保证数据存储的可靠性,一个分片可以存储多个副本(一般3个),副本被自动均衡 分布在集群节点上
  • Leader:一个分片的多个副本中自动选举一个作为Leader,通过Leader操作数据,Leader同步 给其他副本,以此来保证一致性。当Leader挂了时,自动选择一个做Leader。
  • Topic的这些元信息存储在Zookeeper上。
  • Replicas: 0,2,1 副本在哪些broker上 
  • Isr: 0,2,1 副本 存活且同步的broker

都是内部进行操作的。

Leader选举

每个分片的leader如何产生,在kafka中,基于zk ,进行临时节点 +watch ;因为惊群效应以及依赖性太大,节点一旦多了惊群效应影响性能,Kafka没有直接使用zk来进行分片的Leader选举,

选择是采用Kafka中增加一个角色: Controller , 由集群的一个broker来担任这个角色;这个controller会连接到任何节点上去

这里就是真正用的zk 来进行 Controller的选举。 然后所有的 主题的分片的副本的分布、leader的选定都由Controller来完成。 

控制源数据的变更

 消息的分片选择

消息过来,一定需要去选择集群 到底该选择那个分片,这是Kafka中消息分片的选择。这我最开始的想法是利用redis中hash槽, 相似的原理,利用重定向也好都可以使用的。

在kafka中选择的是Producer客户端负责消息的分发

  • kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含”集群 中存活的servers列表”/”partitions leader列表”等信息;
  • 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket 连接;
  • 消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”,事实上,消息被路 由到哪个partition上由producer客户端决定;比如可以采用”random”“key-hash”“轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的。

选择实现达到均衡分布。

消息的分片选择规则:
  • 用户给定了分片号且正确有效,则发到给定分片;
  • 未指定分片,指定了Key,则对Key取Hash 求余决定目标分片
  • 未指定分片,也未提供key,则采用轮询

ProducerRecord  

可以选择,带整数进行分片的。

 

只要能均匀将消息均匀的分布到分片上去。每个topic 可以有多个分片。

客户端直接发消息都发给leader ,在进行同步

分片数据持久化存储原理

首先Kafka 是采用文件来存储数据 ,数据量大 ;这是它存储的方式;

磁盘文件组织方式

Kafka 是一个分布式的流数据存储平台,它 将 流数据以日志的方式顺序存储在磁盘文件 中。数据文件的 数据存放目录下的组织方式为:

日志的 格式包括  数据的存储   索引文件   
[root@node4 kafka-logs]# ll my-33-topic-0 总用量 4 
-rw-r--r--. 1 root root 10485760 8月 25 15:41 00000000000000000000.index 
-rw-r--r--. 1 root root 0 8月 25 15:41 00000000000000000000.log 
-rw-r--r--. 1 root root 10485756 8月 25 15:41 00000000000000000000.timeindex 
-rw-r--r--. 1 root root 8 8月 25 15:41 leader-epoch-checkpoint
像写日志一样,追加流数据。 文件的顺序写 远快于 随机写 ;采用追加的方式,速度非常快的。 消息数据是顺序追加到 .log 文件中,这用写入速度非常快。
  • 日志文件名 这串 00000000000000000000 表示什么
二进制的日志文件名,文件分割用了过后,并且是offset来当作偏移量的。
  • 在这个日志文件中怎么存储数据,怎么知道一条消息的结尾。
存得都是字节 offffset 偏移量 记录 4 个字节(消息内容的长度) + 消息内容(字节序列)  这是在kafka中采用的方式 日志文件数据存储格式 每个日志文件都是 “log entries” 序列,每一个 log entry 包含一个 4 字节整型数(值为 N ),其后跟 N 个字节的消息体。每条消息都有一个当前partition 下唯一的 64 字节的 offffset ,它指明了这条消息的起始位置。磁盘上存储的消息格式如下: 消息长度 : 4 bytes (value: 1 + 4 + n) 版本号 : 1 byte CRC 校验码 : 4 bytes 具体的消息 : n bytes

每条消息都有一个当前partition下唯一的64字节的offffset,它指明了这条消息的起始位置。 那这个offffset值存哪里? 这个信息存储到 .index 索引文件中 。 下面是一个消息的偏移量图示

从那个文件开始读,这里记录第几条数据,并且长度也记录着,偏移量相对于整个分片的。 

索引中存储的数据的结构是怎样的? { 消息序号,存储偏移地址 } 索引没必要一条都存,而是分部分存,稀疏索引,节省存储空间,

 

kafka作为一个分布式的流数据存储平台,它能存储海量的消息数据,那一个分片的数据可能 会很大吗? 有可能一个分片的数据是很大的。这个很大的文件进行分割了。 Segment 段 分片分成多个段(一个 .log 文件)来存储,段的大小固定(可以指定) 

每个 partion( 目录 ) 相当于一个巨型文件被平均分配到多个大小相等 segment( 段 ) 数据文件中,每个 segment 文件名为该 segment 第一条消息的 offffset 和 “.log” 组成。但每个段 segment fifile 消息数量不一定 相等,这种特性方便old segment fifile 快速被删除。(默认情况下每个文件大小为 1G ) 每个 partiton 只需要支持顺序读写就行了, segment 文件生命周期由服务端配置参数决定。 这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。 消息什么时候删除
  • 通过在server.properties文件中配置全局默认的日志保留策略来控制:
支持两种策略:时间 和 大小 。 可多策略,哪个达到即哪个生效。  这两个都可以生效

 全局默认的方式

在定义 Topic 时指定对应参数
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389397.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号