说明本文尚在更新中,未完结
- 首先需要搭建源码环境,不多说了,网上有很多,比如:https://www.cnblogs.com/jun1019/p/7989127.html
- 注释的代码上传到了:https://gitee.com/HanFerm/kafka-source-2.1.0
- 因为源码一篇文章很难讲清楚,而且刚开始阅读只阅读大体步骤比较好,而后续就应该进入调用栈里深层次阅读了,基于这个矛盾关系,所以本文只保证基本的流程,而不保证调用栈的线性关系。所以本文主要以知识点为单位划分,这样在你亲自阅读源码时遇到疑惑的可以Ctrl+F直接搜索,也方便碎片化阅读。总之按着标题来选择感兴趣的内容是最佳的阅读方式,而非以线性的方式阅读本文。
(1)核心组件:Partitioner,用来决定每条消息是路由到Topic的哪个分区里
(2)核心组件:metadata,这个是对于生产端来说非常核心的一个组件,他是用来从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR)),后面如果写消息到Topic,才知道这个Topic有哪些Partitions,Partition Leader所在的Broker
- 初始化拉取:初始化的时候,直接调用metadata组件的方法,去broker上拉取了一次集群的元数据过来
- 元数据定时更新:每隔一小段时间就再次发送请求刷新元数据,metadata.max.age.ms,默认每隔5分钟一定会强制刷新一下
- 元数据临时拉取:还有在发送消息的时候,如果发现你要写入的某个Topic对应的元数据不在本地,那么也会主动发送请求到broker尝试拉取这个topic对应的元数据,如果你在集群里增加了一台broker,也会涉及到元数据的变化
(3)核心参数:每个请求的最大大小(1mb),缓冲区的内存大小(32mb),重试时间间隔(100ms),缓冲区填满之后的阻塞时间(60s),请求超时时间(30s)
(4)核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)
- 默认情况下,如果光光是考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,才能通过request发送到broker上去;
- 但是,如果你发送了一条消息,但是等了很久都没有达到一个batch大小,所以说要设置一个linger.ms(逗留时间),比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去
(5)核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)
(6)核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1代表只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),sender线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动
(7)核心组件:序列化组件,拦截器组件
初始化KafkaProducer在初始化的时候是不会去拉取集群的元数据的,做了一个最最基本的初始化,也就是仅仅把我们配置的那个broker的地址放了进去,在客户端缓存集群元数据的时候,采用了哪些数据结构
发送https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
KafkaProducer.javadoSend
发送的
我们先大致来看一下KafkaProducer.send()方法发送消息的时候,他源码里大致的运行的流程,先来窥探一下
(1)回调自定义的拦截器
(2)同步阻塞等待获取topic元数据
如果你要往一个topic里发送消息,必须是得有这个topic的元数据的,你必须要知道这个topic有哪些分区,然后根据Partitioner组件去选择一个分区,然后知道这个分区对应的leader所在的broker,才能跟那个broker建立连接,发送消息
调用同步阻塞的方法,去等待先得获取到那个topic对应的元数据,如果此时客户端还没缓存那个topic的元数据,那么一定会发送网络请求到broker去拉取那个topic的元数据过来,但是下一次就可以直接根据缓存好的元数据来发送了
(3)序列化key和value
你的key和value可以是各种各样的类型,比如说String、Double、Boolean,或者是自定义的对象,但是如果要发送消息到broker,必须对这个key和value进行序列化,把那些类型的数据转换成byte[]字节数组的形式
(4)基于获取到的topic元数据,使用Partitioner组件获取消息对应的分区
(5)检查要发送的这条消息是否超出了请求最大大小,以及内存缓冲最大大小
(8)设置好自定义的callback回调函数以及对应的interceptor拦截器的回调函数
(7)将消息添加到内存缓冲里去,RecordAccumulator组件负责的
(8)如果某个分区对应的batch填满了,或者是新创建了一个batch,此时就会唤醒Sender线程,让他来进行工作,负责发送batch
刚开始他没有去拉取集群的元数据,而是在后面根据你发送消息时候的需要,要给哪个topic发送消息,再去拉取那个topic对应的元数据,这就是懒加载的设计思想,按需加载思想
对消息的大小,是否超出请求的最大大小,是否会填满 内存缓冲导致内存溢出,对一些核心的请求数据必然要进行严格的检查
异步发送请求,通过先进入内存缓冲,同时设置一个callback回调函数的思路,在发送完成之后来回调你的函数通知你消息发送的结果,异步运行的后台线程配合起来使用,基于异步线程来发送消息
- clusterAndWaitTime = waitonmetadata(record.topic(), record.partition(), maxBlockTimeMs);
- Cluster cluster = clusterAndWaitTime.cluster;
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key())
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
- int partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers); - ensurevalidRecordSize(serializedSize);
- Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
- RecordAccumulator.RecordAppendResult result =
accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs); - this.sender.wakeup();
- return result.future;
maxBlockTimeMs,决定了你调用send()方法的时候,最多会被阻塞多长时间,所以这个方法决定了你的send在一些异常的情况下,比如说拉取topic的元数据,结果跟broker网络有问题,在一段时间后还是拉取不到
在你把数据放到内存缓冲的时候,如果内存缓冲满了,此时最多就只能阻塞这么长时间就必须返回了,如果你希望send()方法被阻塞的时间可以延长或者缩减,此时你可以自己去动手配置这个参数
在客户端的方法尝试等待获取topic元数据的过程中,核心的逻辑,就是说先必须唤醒Sender线程,然后呢就会通过一个while循环,直接去wait释放锁,尝试最多就是等待默认的60s的时间
topic元数据的拉取,是走的是异步的方式的,但是对异步的结果进行同步的阻塞的等待,他其实唤醒Sender线程,就是本质上就是在让那个Sender线程去从broker拉取对应的topic的元数据
如果拉取成功了,那么集群元数据的版本号version一定会累加,所以只要判断version版本号还没有累加,就说明此时Sender县城关还没有成功的拉取元数据,此时就是在主线程里,就是要wait阻塞等待最多60s即可
接下来肯定是分为两种情况:
(1)Sender线程成功的在60s内把topic元数据加载到了,然后缓存到了metadata里去,更新了version版本号,而且此时一定会尝试把wait阻塞等待的主线程给唤醒,让主线程直接返回阻塞等待的时长
(2)如果wait(60s)一直超时了,你的Sender线程都没加载成功元数据,此时人家在60s后自动醒来了,此时会直接超时抛异常
底层的网络通信组件,请求是如何发送的,响应是如何接收的,这些东西都对应了一套kafka底层的网络通信的一套东西,如果我们现在去琢磨和研究他的话,其实是不靠谱的,非常的复杂
没有必要去死扣底层的细节,只要知道在发送消息的时候,如果一个topic的元数据么有,此时会发送请求去broker拉取元数据以及缓存在客户端即可
既然已经有了元数据了,接下来就可以进行分区路由了,把每个消息要路由到某一个分区去,默认有一个DefaultPartitioner,这个里面他实际上是会去负责默认的分区路由的策略,支持三种,指定分区,指定分区key,或者不指定分区key
AtomicInteger,初始值是一个随机的integer类型的数字,接下来默认是递增的,一定会保证是一个正整数,就是比如说topic有5个分区,就会对这个递增的数字(23),对topic的分区数量进行取模
就是根据这个递增的数字23,路由到5个分区中的1个,3
接下来一条消息,就会递增counter,比如说就是24,4;25 -> 0;26 -> 1;27 -> 2;28 -> 3;29 -> 4;30 -> 0,所以说采用了一个counter递增的方式,不断的用一个递增的数字来对分区的数量进行取模
保证在不指定分区key的情况下,所有的消息会均匀的分发到各个分区中去
send()方法调用sender线程
KafkaThread封装sender,sender封装NetworkClient
逻辑和线程
NetworkClient.poll(time,now)
假设指定了分区key
此时,kafka会通过自己的工具类,murmur2,实现一个算法,将一个字节数组转换为一个hash值,你的分区key,比如说订单id就会被通过murmur2算法转换为一个int类型的hash值
只要你的分区key是一样的,比如说是同样的订单id,此时就一定会生成相同的hash值,接下来就用hash值对分区数量进行取模,就可以保证说只要是分区key相同,hash值一定也相同,路由到的分区一定是相同的
对于如果你要保证说发送出去的消息按照一定的规律严格是有序的,比如说mysql binlog就一定要严格按照这个模式来发送,就一定是要按照数据库里的表主键id来作为分区key进行发送
同一条数据的增删改的binlog都是进入到同一个分区的,才能拿到正确的顺序
如何将消息写入内存缓冲里面,先大致浏览一下里面的流程,然后再逐个击破一点一点的去看他,这个里面一定要关注的一点就是说,kafka客户端设计是如何管理自己的内存的,如何基于内存里的数据结构构造一个缓冲区
如何基于缓冲区去承载写入进去的消息,以及batch批处理的机制,消息聚合成batch的机制,整个这套机制是如何来实现的
KafkaProducer设计的理念就是多线程并发安全的,可以让多个线程并发的来调用KafkaProducer还保证数据不会错乱的,所以说是可能会有多个线程并发的来调用他的send()方法的
一个batch就对应了一块内存空间,这里要放一堆的消息,batchSize默认的大小是16kb,如果你的消息最大的值是1mb,如果说你的消息大于了16kb的话,就会使用你的消息的大小来分配一块内存空间
否则如果你的消息是小于16kb的话,那么就会基于16kb来分配内存空间
你在实际生产环境,request.max.size,batch.size是必须要调优的,你必须要根据自己实际发送的消息的大小来设置request.max.size和batch.size,如果你的消息频繁的是超过了batch.sizse的话
一个batch就一条消息,batch打包的机制还有意义吗?每条消息都对应一次网络请求
ReentrantLock他比较好的地方就是可以通过API灵活的控制加锁和释放锁,在这里,BufferPool这里是需要这样灵活的加锁和释放锁的,synchronized效果是一样,代码块的范围来加锁和释放锁
进入synchronized代码块就加锁,出这个代码块就释放锁
BufferPool里是有一个Deque作为队列,缓存了一些ByteBuffer,也就是缓存了一批内存空间,可以用来复用的,就是说他会缓存一批ByteBuffer,每个ByteBuffer都是16kb,默认的batch大小
Dequeu里的ByteBuffer的数量 * 16kb = 已经缓存的内存空间的大小,0
availableMemory就是剩余的还可以使用的内存空间的大小,32mb,此时需要使用掉一块内存空间,减去batchSize的大小,32mb - 16kb,接下来就直接返回ByteBuffer分配出来的一块16kb大小的内存空间
如果当前内存空间还可以分配新的ByteBuffer,那么就是上述的运行逻辑
tryAppend方法,其实是把消息尝试写入Dequeu的最近一个batch中,但是如果Dequeu是空的,这个方法会失败,并发课里讲解过,其实可能会有多个线程并发的执行,多个线程都可能分别拿到一个16kb的ByteBuffer
3个线程,线程1,线程2,线程3,这3个线程都会获取到一个16kb的ByteBuffer内存
假设线程2进入了synchronized代码块里面去,基于16kb的ByteBuffer构造一个batch,放入Dequeue中,就成功了
接着线程3进入了synchronized代码块里面去,直接把消息放入Dequeu中已有的一个batch里去,那么他手上的一个16kb的ByteBuffer怎么办?在这里就会把这个16kb的ByteBuffer给放入到BufferPool的池子里去,保证内存可以复用
SenderKafkaThread封装sender,sender封装NetworkClient
分区规则https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
Recordhttps://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/record/Record.java
缓存池入口还是KafkaProducer.doSend
然后看下面的步骤
RecordAccumulator.RecordAppendResult result =
accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
然后我们进入accumulator.append()方法
https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
RecordAccumulator这个类我们主要关注append()方法,他会多次使用tryAppend()方法来将消息放入队列,但是tryAppend()方法有个问题是它不会创建batch,而之会用现成的batch(新创建的或者别的MQ没有用完的空间),所以需要append()来完成创建batch的操作
batch、队列、内存池的关系内存池>分区的队列>batch
给batch分配内存而free队列是整个kafka共享的
入口:doSend–>RecordAccumulator.append–> free.allocate(size, maxTimeToBlock);
代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
batch引用着每个小buffer
分配batch大小的时候会分两种情况来分配:
- 正常batch:如果16KB够用,就分配16KB。并且如果free队列有空batch,优先使用空batch,没有空batch则缩减nonPooledAvailableMemory
- 大batch:如果MQ大小>16KB,则使用MQ大小作为batch。永远跟free队列无关,只使用nonPooledAvailableMemory,并且如果nonPooledAvailableMemory不够用还会尽可能少地释放free队列的batch
- 如果最终还不够用,会阻塞添加进waiters,如果超过maxBlockMs就抛异常。当有batch内存放回来bufferPool的时候就唤醒waiters。具体后面讲
分配完batch就可以调用batch.tryAppend()成功了
回收:
- 正常batch:回收到free队列
- 大batch:使用立即回收,直接进入内存池
// 如果是16KB,clear后放到free列表中
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
// 如果不是16KB,如1M,直接放回nonPooledAvailableMemory,而不是放回内存池
this.nonPooledAvailableMemory += size;
}
双重检查:2个线程都去申请内存时,因为有如下的double check,所以不会分配2个batch,但是问题在于已经分配过空间了,那么线程2的空间应该释放掉。在2.1版本中解决方案是:如果空间分配过了,buffer对象被batch引用,然后buffer=null赋值为空。在finally块中,if(buffer!=null)释放buffer。这块代码去这里查看吧。https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
申请不到够大的batch如果最终还不够用,会阻塞添加进waiters,如果超过maxBlockMs就抛异常。当有batch内存放回来bufferPool的时候就唤醒waiters。
- 创建condition:Condition moreMemory = this.lock.newCondition();
- 添加进waiters:waiters.addLast(moreMemory);
- while(获得大小<需求大小)
- 是否超时 = !await(时间)
- 谁来通知await:sender完成
- await超时的话抛异常
- 优先是否free队列,free队列没了的时候再拿nonPooledAvailableMemory
- 是否超时 = !await(时间)
Condition moreMemory = this.lock.newCondition(); this.waiters.addLast(moreMemory);给batch追加MQ
上个知识点:给batch分配内存 以及 获取还有空间的batch
相关代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
入口:doSend–>申请buffer–>封装buffer为batch–>将MQ添加进batch–>ProducerBatch.tryAppend()
队列tryAppend的线程安全由上层的synchronized (dq)保证,也就是对batch操作要锁队列
在kafka中队列被CopyOnWriteMap封装着,Map<分区,队列>,它使用了读写分离的思想,只有新创建队列时才是写,下面几种说法等价
- put时为写
- 为分区创建队列时才是put,为队列添加batch并不是put
https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
还是跟随消息收集器来追溯,分区与batch的对应关系是通过CopyOnWriteMap来保存的
// 而RecordAccumulator构造器中 batches = new CopyOnWriteMap>();
CopyOnWriteMap是自己定义的,实现线程安全和读写分离
public class CopyOnWriteMapimplements ConcurrentMap { private volatile Map map; @Override public synchronized V put(K k, V v) { // 开辟新的空间实现读写分离 Map copy = new HashMap (this.map); // 插入数据 V prev = copy.put(k, v); // 替换 this.map = Collections.unmodifiableMap(copy); return prev; }
要发送消息时,通过map.get(分区)获取对应的队列,没有则创建队列,创建分区的队列就是map.put(分区,队列)
他会基于BufferPool给这个batch分配一块内存出来,之所以说是Pool,就是因为这个batch代表的内存空间是可以复用的,用完一块内存之后会放回去下次给别人来使用,复用内存,避免了频繁的使用内存,丢弃对象,垃圾回收
已经可以往Deque队列里写入消息了,已经有一个新分配的batch了(对应了BufferPool分配的一块内存空间)
KafkaProducer设计的理念就是多线程并发安全的,可以让多个线程并发的来调用KafkaProducer还保证数据不会错乱的,所以说是可能会有多个线程并发的来调用他的send()方法的
他会从内存缓冲区里获取一个分区对应的Deque,这个Deque里是一个队列,放了很多的Batch,就是这个分区对应的多个batch,CopyOnWrite这个东西,我们在并发课程里,讲解过CopyOnWriteArrayList
就是说,适合的是读多写少的场景,每次更新的时候,都是copy一个副本,在副本里来更新,接着更新整个副本,好处就在于说写和读的操作互相之间不会有长时间的锁互斥,写的时候不会阻塞读
坏处在于说对内存的占用是很大的,适合的是读多写少的场景,大量读的场景就直接基于快照副本来进行读取的,CoypOnWriteMap也是类似的思路,一个分区创建一个Deque,其实是频次很低的写行为
大量的主要还是在读取,就是去大量的从map里读取一个分区对应的Deque,最后高并发频繁更新的就是分区对应的那个Deque,读的时候基于快照来读即可,所以这种场景非常适合使用CopyOnWrite系列的数据结构
就是说,适合的是读多写少的场景,每次更新的时候,都是copy一个副本,在副本里来更新,接着更新整个副本,好处就在于说写和读的操作互相之间不会有长时间的锁互斥,写的时候不会阻塞读
坏处在于说对内存的占用是很大的,适合的是读多写少的场景,大量读的场景就直接基于快照副本来进行读取的,CoypOnWriteMap也是类似的思路,一个分区创建一个Deque,其实是频次很低的写行为
大量的主要还是在读取,就是去大量的从map里读取一个分区对应的Deque,最后高并发频繁更新的就是分区对应的那个Deque,读的时候基于快照来读即可,所以这种场景非常适合使用CopyOnWrite系列的数据结构
如果说此时还没有创建对应的batch,此时会导致放入Deque会失败
他会基于BufferPool给这个batch分配一块内存出来,之所以说是Pool,就是因为这个batch代表的内存空间是可以复用的,用完一块内存之后会放回去下次给别人来使用,复用内存,避免了频繁的使用内存,丢弃对象,垃圾回收
已经可以往Deque队列里写入消息了,已经有一个新分配的batch了(对应了BufferPool分配的一块内存空间)
他内存中就是一个最最普通的,非线程安全的Map数据结构,但是他把这个数据结构定义为volatile类型,就可以保证可见性,就是只要有人更新了这个引用变量对应的实际的map对象的地址,就可以立即看到
读的时候是完全不用加锁的,多个线程并发进来,高并发的执行读的操作,在这里完全是没有任何的互相之间的影响的,可以实现高并发的读,没有锁在这里。如果队列已经存在了,直接返回即可
多个线程会并发的执行putIfAbsent方法,在这个方法里可以保证线程安全的,除非队列不存在才会设置进去,在put方法的时候是有synchronized,可以保证同一时间只有一个线程会来更新这个值
为什么说写数据的时候不会阻塞读的操作,针对副本进行kv设置,把副本通过volatile写的方式赋值给对应的变量,并发之类的课要学精,否则学大数据无从谈起,跳看,直接来看kafka源码,一定是搞不定的
batch发送时机可以看下面的batch是否该发送了的小节
前提知识点:给batch追加MQ
入口:doSend–>给batch追加MQ,即accumulator.append(–>唤醒sender线程–>异步线程sender.run
从下面的代码可以看出来,是batch满了或者后一个batch已经被创建好了
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
batch什么时候满
ProducerBatch.tryAppend里有recordsBuilder.hasRoomFor(
是介于MemoryRecordsBuilder.java这个类来判断的,总体就一句this.writeLimit >= estimatedBytesWritten() + recordSize;即容量<需要的大小
然后buffer.flip从写转成读
发送MQ前提知识点:batch发送时机
入口:给batch追加MQ后唤醒sender线程–>sender继续执行run
sender.run()
void run(long now) {
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
所以我们进入sendProducerData(now);方法,去该发送批次要发送那些batch
准备发送内容前提知识点:发送MQ
入口:入口:给batch追加MQ后唤醒sender线程–>sender继续执行run–>准备发送的内容,即Sender.sendProducerData–>RecordAccumulator.ready
遍历所有的分区队列–>找到leader
- 找到leader:取出队头batch,进行判断batch是否该发送了
- 也就是说,是否可以发送是以leader为单位的,准备好也是readyNodes.add(leader)
- 判断batch是否可以发送出去不是判断哪个batch可以发送了,也不是用分区来判断的,而是判断broker
- 没找到leader:unknownLeaderTopics.add(part.topic());
把判断为发送的batch添加到集合readyNodes.add(leader);
发送的单位对每个Broker都创建一个ClientReqeust,包括了多个Batch,就是在这个Broker上的多个Leader Partition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,发送到Broker上去
如果此时判断出来这个Batch是可以发送出去的,此时就会将这个Batch对应的那个Partiton的Leader Broker给放入到一个Set里去,他在这里找的不是说找哪些Partition可以发送数据,也不是找Batch
他在这里找的是哪些Broker有数据可以发送过去,而且通过Set进行了去重,可能对于一个Broker而言,是有多个Partiton的Batch可以发送过去的
代码编写的技巧,如果你的方法要返回的是一个复杂的数据结构,此时可以定义一些Bean,里面封装你要返回的数据,哪些Broker可以发送数据过去,下一次来检查是否有Batch可以发送的时间间隔,是否有Partiton还不知道自己的Leader所在的Broker
判断batch是否该发送了这里不要考虑是否支持重试,全部支持重试
总的判断为
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
- full:batch满的batch
- 该batch不是队尾了 || batch的full标志位为true
- expired:超过了重试和逗留时间,这两个事件用timeToWaitMs整体表示
- 第一次:batch的attempts参数为0,所以不是重试,timeToWaitMs=逗留时间。看的是逗留时间
- 第二次:失败时间<要求的重试时间,则不重试,防止重试太频繁timeToWaitMs=逗留时间;超过时间可以重试了,则timeToWaitMs=重试时间
- exhausted:有waiters申请不到内存了,赶紧发送出去吧
- closed:生产者关闭了
- flushInProgress:是否有现车在等待flush
判断为该发送后添加的是broker,而不是batch和分区
逗留时间和重试时间- 逗留时间:lingerMs逗留时间到了必须发送出去,默认是0,表明不需要等待,来一条消息就发送一条消息,很明显不合适。所以我们在发送数据的时候,一定记得去配置这个参数,假设配置100ms,表示最多等待100ms后必须发送出去
- 第一次发送数据:attempts()==0->backingOff=false,截止时间为逗留时间lingerMs。
- 第二次发送数据:重试时间还没到也是逗留时间
- 重试时间:第一次发送MQ失败后,必须超过重试间隔时间后才能再次重试,防止重试太频繁
前提知识点:判断batch是否该发送了
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;筛选可以发送的batch
前提知识点:准备发送内容
入口:给batch追加MQ后唤醒sender线程–>sender继续执行run–>准备发送的内容,即Sender.sendProducerData–>RecordAccumulator.ready
因为有的分区没找到leader信息,所以要把这些分区的batch去除
网络是否准备好入口:sender要发送MQ
代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
NetworkClient.ready()
- 先从缓存中看是否已经有该node的链接了
- 缓存中没有则尝试建立连接该node
kafka网络设计网络没有建立好会发送MQ吗?
发送MQ前会检查网络是否建立好,没有建立则先建立网络。
重新回到Sender.sendProducerData()里的ready逻辑
第一次进入ready时,虽然会初始化网络,但是还是return false,代表没有ready,从而remove掉该node,代表不发送MQ
最后回到sender.run()的第二个方法 client.poll,这时候不不发送,所以下一次循环run才发送
总结:初始化网络和发送MQ不再一次run中执行
sender拉取元数据和发MQ都涉及网络
主要类是NetworkClient
前提知识点:网络是否准备好
入口:构建sender线程–>封装NetworkClient–>封装selector,即NIO
使用:sender发送MQ或拉取元数据
代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/network/Selector.java
public class Selector implements Selectable {
private final java.nio.channels.Selector nioSelector;
初始化链接
前提知识点:网络是否准备好
代码:https://gitee.com/HanFerm/kafka-source-2.1.0/blob/master/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
// channel.connect(address);
boolean connected = doConnect(socketChannel, address);
SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
但是第一次只是创建了channel,还没有建立连接,所以取看client.poll,在里面有selector.poll来建立连接
二进制写ByteBuffer- offset
- size
- crc
- magic valye
- attribute
- timestamp
- key.length
- key
- value.length
- value
ByteBufferOutputStream包裹了ByteBuffer,持有一个针对ByteBuffer的输出流,接着会把ByteBufferOutputStream给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的缓冲区
压缩流会把一条消息放在缓冲区里,用压缩算法给压缩了,再写入底层的ByteBufferOutputStream里去
如果是非压缩的模式,最最普通的情况下,就是DataOutputStream包裹了ByteBufferOutputSteram,然后写入数据,Long、Byte、String,都会在底层转换为字节进入到ByteBuffer里去
内存空间管理的方式,包括他有内存缓冲的核心数据结构,内存缓冲池,ByteBuffer,如何通过IO流将数据写入ByteBuffer的,如何按照二进制协议规范来写一条消息的



