栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Kafka源码】要点总结

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Kafka源码】要点总结

本文尚在更新中,未完结

说明
  • 首先需要搭建源码环境,不多说了,网上有很多,比如:https://www.cnblogs.com/jun1019/p/7989127.html
  • 注释的代码上传到了:https://gitee.com/HanFerm/kafka-source-2.1.0
  • 因为源码一篇文章很难讲清楚,而且刚开始阅读只阅读大体步骤比较好,而后续就应该进入调用栈里深层次阅读了,基于这个矛盾关系,所以本文只保证基本的流程,而不保证调用栈的线性关系。所以本文主要以知识点为单位划分,这样在你亲自阅读源码时遇到疑惑的可以Ctrl+F直接搜索,也方便碎片化阅读。总之按着标题来选择感兴趣的内容是最佳的阅读方式,而非以线性的方式阅读本文。
流程

核心组件

(1)核心组件:Partitioner,用来决定每条消息是路由到Topic的哪个分区里

(2)核心组件:metadata,这个是对于生产端来说非常核心的一个组件,他是用来从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR)),后面如果写消息到Topic,才知道这个Topic有哪些Partitions,Partition Leader所在的Broker

  • 初始化拉取:初始化的时候,直接调用metadata组件的方法,去broker上拉取了一次集群的元数据过来
  • 元数据定时更新:每隔一小段时间就再次发送请求刷新元数据,metadata.max.age.ms,默认每隔5分钟一定会强制刷新一下
  • 元数据临时拉取:还有在发送消息的时候,如果发现你要写入的某个Topic对应的元数据不在本地,那么也会主动发送请求到broker尝试拉取这个topic对应的元数据,如果你在集群里增加了一台broker,也会涉及到元数据的变化

(3)核心参数:每个请求的最大大小(1mb),缓冲区的内存大小(32mb),重试时间间隔(100ms),缓冲区填满之后的阻塞时间(60s),请求超时时间(30s)

(4)核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)

  • 默认情况下,如果光光是考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,才能通过request发送到broker上去;
  • 但是,如果你发送了一条消息,但是等了很久都没有达到一个batch大小,所以说要设置一个linger.ms(逗留时间),比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去

(5)核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)

(6)核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1代表只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),sender线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动

(7)核心组件:序列化组件,拦截器组件

初始化

KafkaProducer在初始化的时候是不会去拉取集群的元数据的,做了一个最最基本的初始化,也就是仅仅把我们配置的那个broker的地址放了进去,在客户端缓存集群元数据的时候,采用了哪些数据结构

发送

https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

KafkaProducer.javadoSend

发送的

我们先大致来看一下KafkaProducer.send()方法发送消息的时候,他源码里大致的运行的流程,先来窥探一下

(1)回调自定义的拦截器

(2)同步阻塞等待获取topic元数据

如果你要往一个topic里发送消息,必须是得有这个topic的元数据的,你必须要知道这个topic有哪些分区,然后根据Partitioner组件去选择一个分区,然后知道这个分区对应的leader所在的broker,才能跟那个broker建立连接,发送消息

调用同步阻塞的方法,去等待先得获取到那个topic对应的元数据,如果此时客户端还没缓存那个topic的元数据,那么一定会发送网络请求到broker去拉取那个topic的元数据过来,但是下一次就可以直接根据缓存好的元数据来发送了

(3)序列化key和value

你的key和value可以是各种各样的类型,比如说String、Double、Boolean,或者是自定义的对象,但是如果要发送消息到broker,必须对这个key和value进行序列化,把那些类型的数据转换成byte[]字节数组的形式

(4)基于获取到的topic元数据,使用Partitioner组件获取消息对应的分区

(5)检查要发送的这条消息是否超出了请求最大大小,以及内存缓冲最大大小

(8)设置好自定义的callback回调函数以及对应的interceptor拦截器的回调函数

(7)将消息添加到内存缓冲里去,RecordAccumulator组件负责的

(8)如果某个分区对应的batch填满了,或者是新创建了一个batch,此时就会唤醒Sender线程,让他来进行工作,负责发送batch

刚开始他没有去拉取集群的元数据,而是在后面根据你发送消息时候的需要,要给哪个topic发送消息,再去拉取那个topic对应的元数据,这就是懒加载的设计思想,按需加载思想

对消息的大小,是否超出请求的最大大小,是否会填满 内存缓冲导致内存溢出,对一些核心的请求数据必然要进行严格的检查

异步发送请求,通过先进入内存缓冲,同时设置一个callback回调函数的思路,在发送完成之后来回调你的函数通知你消息发送的结果,异步运行的后台线程配合起来使用,基于异步线程来发送消息

  • clusterAndWaitTime = waitonmetadata(record.topic(), record.partition(), maxBlockTimeMs);
  • Cluster cluster = clusterAndWaitTime.cluster;
  • serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key())
  • serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  • int partition = partition(record, serializedKey, serializedValue, cluster);
  • tp = new TopicPartition(record.topic(), partition);
  • int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
    compressionType, serializedKey, serializedValue, headers);
  • ensurevalidRecordSize(serializedSize);
  • Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  • RecordAccumulator.RecordAppendResult result =
    accumulator.append(tp, timestamp, serializedKey,
    serializedValue, headers, interceptCallback, remainingWaitMs);
  • this.sender.wakeup();
  • return result.future;
阻塞时间

maxBlockTimeMs,决定了你调用send()方法的时候,最多会被阻塞多长时间,所以这个方法决定了你的send在一些异常的情况下,比如说拉取topic的元数据,结果跟broker网络有问题,在一段时间后还是拉取不到

在你把数据放到内存缓冲的时候,如果内存缓冲满了,此时最多就只能阻塞这么长时间就必须返回了,如果你希望send()方法被阻塞的时间可以延长或者缩减,此时你可以自己去动手配置这个参数

在客户端的方法尝试等待获取topic元数据的过程中,核心的逻辑,就是说先必须唤醒Sender线程,然后呢就会通过一个while循环,直接去wait释放锁,尝试最多就是等待默认的60s的时间

topic元数据的拉取,是走的是异步的方式的,但是对异步的结果进行同步的阻塞的等待,他其实唤醒Sender线程,就是本质上就是在让那个Sender线程去从broker拉取对应的topic的元数据

如果拉取成功了,那么集群元数据的版本号version一定会累加,所以只要判断version版本号还没有累加,就说明此时Sender县城关还没有成功的拉取元数据,此时就是在主线程里,就是要wait阻塞等待最多60s即可

接下来肯定是分为两种情况:

(1)Sender线程成功的在60s内把topic元数据加载到了,然后缓存到了metadata里去,更新了version版本号,而且此时一定会尝试把wait阻塞等待的主线程给唤醒,让主线程直接返回阻塞等待的时长

(2)如果wait(60s)一直超时了,你的Sender线程都没加载成功元数据,此时人家在60s后自动醒来了,此时会直接超时抛异常

底层的网络通信组件,请求是如何发送的,响应是如何接收的,这些东西都对应了一套kafka底层的网络通信的一套东西,如果我们现在去琢磨和研究他的话,其实是不靠谱的,非常的复杂

没有必要去死扣底层的细节,只要知道在发送消息的时候,如果一个topic的元数据么有,此时会发送请求去broker拉取元数据以及缓存在客户端即可

既然已经有了元数据了,接下来就可以进行分区路由了,把每个消息要路由到某一个分区去,默认有一个DefaultPartitioner,这个里面他实际上是会去负责默认的分区路由的策略,支持三种,指定分区,指定分区key,或者不指定分区key

AtomicInteger,初始值是一个随机的integer类型的数字,接下来默认是递增的,一定会保证是一个正整数,就是比如说topic有5个分区,就会对这个递增的数字(23),对topic的分区数量进行取模

就是根据这个递增的数字23,路由到5个分区中的1个,3

接下来一条消息,就会递增counter,比如说就是24,4;25 -> 0;26 -> 1;27 -> 2;28 -> 3;29 -> 4;30 -> 0,所以说采用了一个counter递增的方式,不断的用一个递增的数字来对分区的数量进行取模

保证在不指定分区key的情况下,所有的消息会均匀的分发到各个分区中去

send()方法调用sender线程

KafkaThread封装sender,sender封装NetworkClient

逻辑和线程

NetworkClient.poll(time,now)

假设指定了分区key

此时,kafka会通过自己的工具类,murmur2,实现一个算法,将一个字节数组转换为一个hash值,你的分区key,比如说订单id就会被通过murmur2算法转换为一个int类型的hash值

只要你的分区key是一样的,比如说是同样的订单id,此时就一定会生成相同的hash值,接下来就用hash值对分区数量进行取模,就可以保证说只要是分区key相同,hash值一定也相同,路由到的分区一定是相同的

对于如果你要保证说发送出去的消息按照一定的规律严格是有序的,比如说mysql binlog就一定要严格按照这个模式来发送,就一定是要按照数据库里的表主键id来作为分区key进行发送

同一条数据的增删改的binlog都是进入到同一个分区的,才能拿到正确的顺序

如何将消息写入内存缓冲里面,先大致浏览一下里面的流程,然后再逐个击破一点一点的去看他,这个里面一定要关注的一点就是说,kafka客户端设计是如何管理自己的内存的,如何基于内存里的数据结构构造一个缓冲区

如何基于缓冲区去承载写入进去的消息,以及batch批处理的机制,消息聚合成batch的机制,整个这套机制是如何来实现的

KafkaProducer设计的理念就是多线程并发安全的,可以让多个线程并发的来调用KafkaProducer还保证数据不会错乱的,所以说是可能会有多个线程并发的来调用他的send()方法的

一个batch就对应了一块内存空间,这里要放一堆的消息,batchSize默认的大小是16kb,如果你的消息最大的值是1mb,如果说你的消息大于了16kb的话,就会使用你的消息的大小来分配一块内存空间

否则如果你的消息是小于16kb的话,那么就会基于16kb来分配内存空间

你在实际生产环境,request.max.size,batch.size是必须要调优的,你必须要根据自己实际发送的消息的大小来设置request.max.size和batch.size,如果你的消息频繁的是超过了batch.sizse的话

一个batch就一条消息,batch打包的机制还有意义吗?每条消息都对应一次网络请求

ReentrantLock他比较好的地方就是可以通过API灵活的控制加锁和释放锁,在这里,BufferPool这里是需要这样灵活的加锁和释放锁的,synchronized效果是一样,代码块的范围来加锁和释放锁

进入synchronized代码块就加锁,出这个代码块就释放锁

BufferPool里是有一个Deque作为队列,缓存了一些ByteBuffer,也就是缓存了一批内存空间,可以用来复用的,就是说他会缓存一批ByteBuffer,每个ByteBuffer都是16kb,默认的batch大小

Dequeu里的ByteBuffer的数量 * 16kb = 已经缓存的内存空间的大小,0

availableMemory就是剩余的还可以使用的内存空间的大小,32mb,此时需要使用掉一块内存空间,减去batchSize的大小,32mb - 16kb,接下来就直接返回ByteBuffer分配出来的一块16kb大小的内存空间

如果当前内存空间还可以分配新的ByteBuffer,那么就是上述的运行逻辑

tryAppend方法,其实是把消息尝试写入Dequeu的最近一个batch中,但是如果Dequeu是空的,这个方法会失败,并发课里讲解过,其实可能会有多个线程并发的执行,多个线程都可能分别拿到一个16kb的ByteBuffer

3个线程,线程1,线程2,线程3,这3个线程都会获取到一个16kb的ByteBuffer内存

假设线程2进入了synchronized代码块里面去,基于16kb的ByteBuffer构造一个batch,放入Dequeue中,就成功了

接着线程3进入了synchronized代码块里面去,直接把消息放入Dequeu中已有的一个batch里去,那么他手上的一个16kb的ByteBuffer怎么办?在这里就会把这个16kb的ByteBuffer给放入到BufferPool的池子里去,保证内存可以复用

Sender

KafkaThread封装sender,sender封装NetworkClient

分区规则

https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

Record

https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/record/Record.java

缓存池

入口还是KafkaProducer.doSend

然后看下面的步骤

RecordAccumulator.RecordAppendResult result =
    accumulator.append(tp, timestamp, serializedKey,
                       serializedValue, headers, interceptCallback, remainingWaitMs);

然后我们进入accumulator.append()方法

https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

RecordAccumulator这个类我们主要关注append()方法,他会多次使用tryAppend()方法来将消息放入队列,但是tryAppend()方法有个问题是它不会创建batch,而之会用现成的batch(新创建的或者别的MQ没有用完的空间),所以需要append()来完成创建batch的操作

batch、队列、内存池的关系

内存池>分区的队列>batch

而free队列是整个kafka共享的

给batch分配内存

入口:doSend–>RecordAccumulator.append–> free.allocate(size, maxTimeToBlock);

代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java

batch引用着每个小buffer

分配batch大小的时候会分两种情况来分配:

  • 正常batch:如果16KB够用,就分配16KB。并且如果free队列有空batch,优先使用空batch,没有空batch则缩减nonPooledAvailableMemory
  • 大batch:如果MQ大小>16KB,则使用MQ大小作为batch。永远跟free队列无关,只使用nonPooledAvailableMemory,并且如果nonPooledAvailableMemory不够用还会尽可能少地释放free队列的batch
    • 如果最终还不够用,会阻塞添加进waiters,如果超过maxBlockMs就抛异常。当有batch内存放回来bufferPool的时候就唤醒waiters。具体后面讲

分配完batch就可以调用batch.tryAppend()成功了

回收:

  • 正常batch:回收到free队列
  • 大batch:使用立即回收,直接进入内存池
// 如果是16KB,clear后放到free列表中
if (size == this.poolableSize && size == buffer.capacity()) {
    buffer.clear();
    this.free.add(buffer);
} else {
    // 如果不是16KB,如1M,直接放回nonPooledAvailableMemory,而不是放回内存池
    this.nonPooledAvailableMemory += size;
}

双重检查:2个线程都去申请内存时,因为有如下的double check,所以不会分配2个batch,但是问题在于已经分配过空间了,那么线程2的空间应该释放掉。在2.1版本中解决方案是:如果空间分配过了,buffer对象被batch引用,然后buffer=null赋值为空。在finally块中,if(buffer!=null)释放buffer。这块代码去这里查看吧。https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

申请不到够大的batch

如果最终还不够用,会阻塞添加进waiters,如果超过maxBlockMs就抛异常。当有batch内存放回来bufferPool的时候就唤醒waiters。

  • 创建condition:Condition moreMemory = this.lock.newCondition();
  • 添加进waiters:waiters.addLast(moreMemory);
  • while(获得大小<需求大小)
    • 是否超时 = !await(时间)
      • 谁来通知await:sender完成
    • await超时的话抛异常
    • 优先是否free队列,free队列没了的时候再拿nonPooledAvailableMemory
Condition moreMemory = this.lock.newCondition();
this.waiters.addLast(moreMemory);
给batch追加MQ

上个知识点:给batch分配内存 以及 获取还有空间的batch

相关代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

入口:doSend–>申请buffer–>封装buffer为batch–>将MQ添加进batch–>ProducerBatch.tryAppend()

tryAppend的线程安全由上层的synchronized (dq)保证,也就是对batch操作要锁队列

队列

在kafka中队列被CopyOnWriteMap封装着,Map<分区,队列>,它使用了读写分离的思想,只有新创建队列时才是写,下面几种说法等价

  • put时为写
  • 为分区创建队列时才是put,为队列添加batch并不是put

https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java

还是跟随消息收集器来追溯,分区与batch的对应关系是通过CopyOnWriteMap来保存的

// 而RecordAccumulator构造器中
batches = new CopyOnWriteMap>();

CopyOnWriteMap是自己定义的,实现线程安全和读写分离

public class CopyOnWriteMap implements ConcurrentMap {
    private volatile Map map;

    @Override
    public synchronized V put(K k, V v) {
        // 开辟新的空间实现读写分离
        Map copy = new HashMap(this.map);
        // 插入数据
        V prev = copy.put(k, v);
        // 替换
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

要发送消息时,通过map.get(分区)获取对应的队列,没有则创建队列,创建分区的队列就是map.put(分区,队列)

他会基于BufferPool给这个batch分配一块内存出来,之所以说是Pool,就是因为这个batch代表的内存空间是可以复用的,用完一块内存之后会放回去下次给别人来使用,复用内存,避免了频繁的使用内存,丢弃对象,垃圾回收

已经可以往Deque队列里写入消息了,已经有一个新分配的batch了(对应了BufferPool分配的一块内存空间)

KafkaProducer设计的理念就是多线程并发安全的,可以让多个线程并发的来调用KafkaProducer还保证数据不会错乱的,所以说是可能会有多个线程并发的来调用他的send()方法的

他会从内存缓冲区里获取一个分区对应的Deque,这个Deque里是一个队列,放了很多的Batch,就是这个分区对应的多个batch,CopyOnWrite这个东西,我们在并发课程里,讲解过CopyOnWriteArrayList

就是说,适合的是读多写少的场景,每次更新的时候,都是copy一个副本,在副本里来更新,接着更新整个副本,好处就在于说写和读的操作互相之间不会有长时间的锁互斥,写的时候不会阻塞读

坏处在于说对内存的占用是很大的,适合的是读多写少的场景,大量读的场景就直接基于快照副本来进行读取的,CoypOnWriteMap也是类似的思路,一个分区创建一个Deque,其实是频次很低的写行为

大量的主要还是在读取,就是去大量的从map里读取一个分区对应的Deque,最后高并发频繁更新的就是分区对应的那个Deque,读的时候基于快照来读即可,所以这种场景非常适合使用CopyOnWrite系列的数据结构

就是说,适合的是读多写少的场景,每次更新的时候,都是copy一个副本,在副本里来更新,接着更新整个副本,好处就在于说写和读的操作互相之间不会有长时间的锁互斥,写的时候不会阻塞读

坏处在于说对内存的占用是很大的,适合的是读多写少的场景,大量读的场景就直接基于快照副本来进行读取的,CoypOnWriteMap也是类似的思路,一个分区创建一个Deque,其实是频次很低的写行为

大量的主要还是在读取,就是去大量的从map里读取一个分区对应的Deque,最后高并发频繁更新的就是分区对应的那个Deque,读的时候基于快照来读即可,所以这种场景非常适合使用CopyOnWrite系列的数据结构

如果说此时还没有创建对应的batch,此时会导致放入Deque会失败

他会基于BufferPool给这个batch分配一块内存出来,之所以说是Pool,就是因为这个batch代表的内存空间是可以复用的,用完一块内存之后会放回去下次给别人来使用,复用内存,避免了频繁的使用内存,丢弃对象,垃圾回收

已经可以往Deque队列里写入消息了,已经有一个新分配的batch了(对应了BufferPool分配的一块内存空间)

他内存中就是一个最最普通的,非线程安全的Map数据结构,但是他把这个数据结构定义为volatile类型,就可以保证可见性,就是只要有人更新了这个引用变量对应的实际的map对象的地址,就可以立即看到

读的时候是完全不用加锁的,多个线程并发进来,高并发的执行读的操作,在这里完全是没有任何的互相之间的影响的,可以实现高并发的读,没有锁在这里。如果队列已经存在了,直接返回即可

多个线程会并发的执行putIfAbsent方法,在这个方法里可以保证线程安全的,除非队列不存在才会设置进去,在put方法的时候是有synchronized,可以保证同一时间只有一个线程会来更新这个值

为什么说写数据的时候不会阻塞读的操作,针对副本进行kv设置,把副本通过volatile写的方式赋值给对应的变量,并发之类的课要学精,否则学大数据无从谈起,跳看,直接来看kafka源码,一定是搞不定的

batch发送时机

可以看下面的batch是否该发送了的小节

前提知识点:给batch追加MQ

入口:doSend–>给batch追加MQ,即accumulator.append(–>唤醒sender线程–>异步线程sender.run

从下面的代码可以看出来,是batch满了或者后一个batch已经被创建好了

if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    
    this.sender.wakeup();
}
batch什么时候满

ProducerBatch.tryAppend里有recordsBuilder.hasRoomFor(

是介于MemoryRecordsBuilder.java这个类来判断的,总体就一句this.writeLimit >= estimatedBytesWritten() + recordSize;即容量<需要的大小

然后buffer.flip从写转成读

发送MQ

前提知识点:batch发送时机

入口:给batch追加MQ后唤醒sender线程–>sender继续执行run

sender.run()

void run(long now) {
    
    long pollTimeout = sendProducerData(now);
    
    client.poll(pollTimeout, now);
}

所以我们进入sendProducerData(now);方法,去该发送批次要发送那些batch

准备发送内容

前提知识点:发送MQ

入口:入口:给batch追加MQ后唤醒sender线程–>sender继续执行run–>准备发送的内容,即Sender.sendProducerData–>RecordAccumulator.ready

遍历所有的分区队列–>找到leader

  • 找到leader:取出队头batch,进行判断batch是否该发送了
    • 也就是说,是否可以发送是以leader为单位的,准备好也是readyNodes.add(leader)
    • 判断batch是否可以发送出去不是判断哪个batch可以发送了,也不是用分区来判断的,而是判断broker
  • 没找到leader:unknownLeaderTopics.add(part.topic());

把判断为发送的batch添加到集合readyNodes.add(leader);

发送的单位

对每个Broker都创建一个ClientReqeust,包括了多个Batch,就是在这个Broker上的多个Leader Partition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,发送到Broker上去

如果此时判断出来这个Batch是可以发送出去的,此时就会将这个Batch对应的那个Partiton的Leader Broker给放入到一个Set里去,他在这里找的不是说找哪些Partition可以发送数据,也不是找Batch

他在这里找的是哪些Broker有数据可以发送过去,而且通过Set进行了去重,可能对于一个Broker而言,是有多个Partiton的Batch可以发送过去的

代码编写的技巧,如果你的方法要返回的是一个复杂的数据结构,此时可以定义一些Bean,里面封装你要返回的数据,哪些Broker可以发送数据过去,下一次来检查是否有Batch可以发送的时间间隔,是否有Partiton还不知道自己的Leader所在的Broker

判断batch是否该发送了

这里不要考虑是否支持重试,全部支持重试

总的判断为

boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
  • full:batch满的batch
    • 该batch不是队尾了 || batch的full标志位为true
  • expired:超过了重试和逗留时间,这两个事件用timeToWaitMs整体表示
    • 第一次:batch的attempts参数为0,所以不是重试,timeToWaitMs=逗留时间。看的是逗留时间
    • 第二次:失败时间<要求的重试时间,则不重试,防止重试太频繁timeToWaitMs=逗留时间;超过时间可以重试了,则timeToWaitMs=重试时间
  • exhausted:有waiters申请不到内存了,赶紧发送出去吧
  • closed:生产者关闭了
  • flushInProgress:是否有现车在等待flush

判断为该发送后添加的是broker,而不是batch和分区

逗留时间和重试时间
  • 逗留时间:lingerMs逗留时间到了必须发送出去,默认是0,表明不需要等待,来一条消息就发送一条消息,很明显不合适。所以我们在发送数据的时候,一定记得去配置这个参数,假设配置100ms,表示最多等待100ms后必须发送出去
    • 第一次发送数据:attempts()==0->backingOff=false,截止时间为逗留时间lingerMs。
    • 第二次发送数据:重试时间还没到也是逗留时间
  • 重试时间:第一次发送MQ失败后,必须超过重试间隔时间后才能再次重试,防止重试太频繁

前提知识点:判断batch是否该发送了

boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
筛选可以发送的batch

前提知识点:准备发送内容

入口:给batch追加MQ后唤醒sender线程–>sender继续执行run–>准备发送的内容,即Sender.sendProducerData–>RecordAccumulator.ready

因为有的分区没找到leader信息,所以要把这些分区的batch去除

网络是否准备好

入口:sender要发送MQ

代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

NetworkClient.ready()

  • 先从缓存中看是否已经有该node的链接了
  • 缓存中没有则尝试建立连接该node

网络没有建立好会发送MQ吗?

发送MQ前会检查网络是否建立好,没有建立则先建立网络。

重新回到Sender.sendProducerData()里的ready逻辑

第一次进入ready时,虽然会初始化网络,但是还是return false,代表没有ready,从而remove掉该node,代表不发送MQ

最后回到sender.run()的第二个方法 client.poll,这时候不不发送,所以下一次循环run才发送

总结:初始化网络和发送MQ不再一次run中执行

kafka网络设计

sender拉取元数据和发MQ都涉及网络

主要类是NetworkClient

前提知识点:网络是否准备好

入口:构建sender线程–>封装NetworkClient–>封装selector,即NIO

使用:sender发送MQ或拉取元数据

代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/network/Selector.java

public class Selector implements Selectable {
    
    private final java.nio.channels.Selector nioSelector;

初始化链接

前提知识点:网络是否准备好

代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/network/Selector.java

@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    ensureNotRegistered(id);
    SocketChannel socketChannel = SocketChannel.open();

    configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
    // channel.connect(address);
    boolean connected = doConnect(socketChannel, address);
    SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

    if (connected) {
        immediatelyConnectedKeys.add(key);
        key.interestOps(0);
    }

}

但是第一次只是创建了channel,还没有建立连接,所以取看client.poll,在里面有selector.poll来建立连接

二进制写ByteBuffer
  • offset
  • size
  • crc
  • magic valye
  • attribute
  • timestamp
  • key.length
  • key
  • value.length
  • value

ByteBufferOutputStream包裹了ByteBuffer,持有一个针对ByteBuffer的输出流,接着会把ByteBufferOutputStream给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的缓冲区

压缩流会把一条消息放在缓冲区里,用压缩算法给压缩了,再写入底层的ByteBufferOutputStream里去

如果是非压缩的模式,最最普通的情况下,就是DataOutputStream包裹了ByteBufferOutputSteram,然后写入数据,Long、Byte、String,都会在底层转换为字节进入到ByteBuffer里去

内存空间管理的方式,包括他有内存缓冲的核心数据结构,内存缓冲池,ByteBuffer,如何通过IO流将数据写入ByteBuffer的,如何按照二进制协议规范来写一条消息的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/361454.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号