在生产者发往kafka集群的过程,在上篇生产者初级使用中已经简单介绍了,这里在详细介绍一下相关原理和高级特性。
一. 生产者原理先更着源码了解一下生产者发送消息的原理。
在上一篇文章中发送消息代码如下:
ProducerRecordrecord = new ProducerRecord<>(Topic, msg); kafkaProducer.send(record);
这里是调用send方法进行发送的:
@Override public Futuresend(ProducerRecord record) { return send(record, null); }
在消息的发送的过程中(kafkaProducer初始化的事情,具体可以查看上一篇文章),涉及到了两个线程(其中一个是守护线程):
主线程:在初始化生产者的时候会创建一个双端队列RecordAccumulator用来存放消息Sender线程(守护线程):Sender线程也是在生产者的初始化中构建并启动,Sender线程会不断的从RecordAccumulator中拉取消息发送到Kafka Broker中。
下面详细看一下整体流程!
1.1. 第一阶段这一个阶段消息到达生产者的时候,会经过拦截器、序列化器和分区器这三个组件。
先经过拦截器,拦截器通过 interceptor.classes指定,是可选的。
@Override public Futuresend(ProducerRecord record, Callback callback) { // 拦截器使用 ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
接着到doSend方法中看一下首先就是获取broker的元数据(记录Kafka集群分区主题等相关信息,后续的文章介绍Kafka的元数据更新流程)
waitOnmetadata方法会返回拉取元数据所耗费的时间;并且在消息发送时的最大等待时间时会扣除该部分损耗的时间。接着会对key和value进行序列化:
然后会根据分区负载算法计算本次消息发送该发往的分区,默认的是使用DefaultPartitioner,也可以通过partitioner.class设置。
这里需要注意下一个tp = new TopicPartition(record.topic(), partition);这里会将主题和具体的分区组成一个对象作为key去获取指定的双端队列进去数据写入。
剩下的就是计算消息大小判断是否超过配置,设置事务等
这里可以设置:
max.request.size:最大请求字节大小,默认是1Mbuffer.memory:RecordAccumulator缓冲大小默认是32M
最后就是将消息追加到缓存区:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
上面看的大体流程如下图所示:
这个分成第二个阶段是因为在涉及到RecordAccumulator里面的处理稍微复杂点,拿出来详细说一下。
这里需要重点注意下RecordAccumulator#append方法,根据主题与分区获取一个双端队列,如果不存在,则创建一个,然后调用 tryAppend 方法将消息追加到缓存中。成功之后直接返回。
消息批次ProducerBatch的大小可以通过设置:batch.size,默认是16k。
如果appendResult返回的时候null的话,因为默认的abortOnNewBatch是true,此时说明当前分区没有可用的批次,就需要更换一个分区再次尝试。
此时到KafkaProducer#doSend切换分区,会将abortOnNewBatch是false将不会再次尝试。
此时再次进入RecordAccumulator#append此时说明第一次追加数据的时候是失败的,需要创建一个新的批次,需要先申请大小为batch.szie内存空间,由于没有足够的空间,如果在maxTimeToBlock时间内没有申请到内存,就会抛出异常。这里使用的是内存池,后面再介绍。
接着先创建一个新的ProducerBatch,将消息追加到ProducerBatch,然后将ProducerBatch添加到双端队列的队尾,然后将当前ProducerBatch添加到incomplete中(incomplete中放着的是存放未完成发送到服务器消息ProducerBatch,只有到发送线程Sender发送成功之后,就会将方法ProducerBatch删除并释放内存)。
第三部分大体流程如下图所示:
第三阶段介绍一下Sender发送消息到服务器的过程,从第一、二阶段可以知道双端队列不为空、当前ProducerBatch已满或者创建新的ProducerBatch才会去唤醒Sender线程去发现消息:
接下来我们看一下如何发送消息的,Sender是守护线程,直接去看一下Sender#run()方法,大体分成四部分:
发送消息到服务器,运行直到关闭被调用
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
非强制关闭的情况下,可能存在消息等待发送,需要使用runOnce方法将消费发动
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
非强制关闭的情况下,事务处理(后续介绍)
// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
强制关闭,关闭事务和释放缓冲区
if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
最后关闭网络连接
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
接下来重点看一下Sender#runonce()方法,其中事务的相关的先跳过(后续介绍),重点关注一下:sendProducerData方法。这里的Sender#sendProducerData()是用来创建发送到Kafka集群的请求,真正发送请求处理返回的是client.poll() 方法。
所以发送消息的时候需要根据当前时间,判断缓冲队列那些主题的分区达到发送的条件,如果发送的消息没有找到路由信息,需要先去服务器拉取主分区信息。
// 获取集群信息 Cluster cluster = metadata.fetch(); // 获取分区已经准备好发送的节点、未知leader的主题的集合和不能发送分区准备就绪的最早时间 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
接着需要根据未知主分区的主题,标记更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
遍历所有准备好的节点,将不可用的节点删除,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。
// remove any nodes we aren't ready to send to Iteratoriter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
接着就会对准备好的节点进行数据取出,封装为Map
// create produce requests Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
这里还有一个guaranteeMessageOrder,这个属性是通过配置:max.in.flight.request.per.connection(单个连接上发送的未确认请求的最大连接数,默认是5),这个属性是确保顺序消费的关键,在Kafka中可以保证同一个分区中的消息是有序的,如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们;但Producer发送消息过程发生重试时,而max.in.flight.requests.per.connection参数配置为大于1的值,那么就会出现错序的现象。所以在需要保证顺序消费的场景下max.in.flight.requests.per.connection配置为1
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
guaranteeMessageOrder这个值在初始化的时候进行设置的。
接着将把batches转为按照Map
private void addToInflightBatches(Listbatches) { for (ProducerBatch batch : batches) { List inflightBatchList = inFlightBatches.get(batch.topicPartition); if (inflightBatchList == null) { inflightBatchList = new ArrayList<>(); inFlightBatches.put(batch.topicPartition, inflightBatchList); } inflightBatchList.add(batch); } } public void addToInflightBatches(Map > batches) { for (List batchList : batches.values()) { addToInflightBatches(batchList); } } addToInflightBatches(batches);
接着是筛选已过期的批次,这个筛选是通过配置的参数delivery.timeout.ms,默认是120s进行判断的。
accumulator.resetNextBatchExpiryTime(); ListexpiredInflightBatches = getExpiredInflightBatches(now); List expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches);
接着就是处理已经超时的消息批次,返回失败信息,然后生成下一次延迟发送的时间,并构建发送请求。
下面在看一下Sender#sendProduceRequests方法,这里按照节点ID对应批次消息构建发送请求,每一个节点ID将会对应多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,对应服务器的节点连接只会只能发送一个请求,需要注意这里只是用于构建请求,会将数据排队待发送中,并不是真正的数据发送。
private void sendProduceRequests(Map> collated, long now) { // 这里涉及的ACK消息确认机制后面后详细介绍 for (Map.Entry > entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); }
需要注意在Sender#sendProduceRequest方法中其实又会将List
最后构建请求,并将数据send,进行排队等待发送。
接着在client.send(clientRequest, now)中看看是如何放入等待队列中的,具体的看NewWorkClient#doSend()方法中,这个方法其实也是做了一些前置的处理。真正增加到等待队列的是另一个doSend方法中。
在这个doSend方法中会构建Kafka 协议中请求的头信息,构建NetworkClient.InFlightRequest,最后将请求放到请求选择器中的Map
这里的inFlightRequests是用来存放当前正在发送或等待响应的一组请求,这个数量是可以通过参数:max.in.flight.request.per.connection单个连接上发送的未确认请求的最大连接数,默认5。其中的canSendMore在前面介绍过。
在选择器中的Selector#send方法中会通过节点ID获取到对应KafkaChannel(相当于NIO网络通讯Channel部分)。这里放入之后就会开始调用NetworkClient#poll方法进行请求的放松。
后面是相关的网络的发送就不是本文的重点了,有时间再去查看一个Kafka网络相关的内容吧。
1.4. 示意图关于kafka的整个执行大体流程可以看下面的示意图:
可以修改或者提高下面的这些参数:
batch.size:批量发送大小,默认是16384(16k)linger.ms:发送延迟时间,默认是0,可以提高到5-100ms,注意这个参数配置会增大延迟。compression.type:压缩类型,默认是producer(未压缩),还有其他的配置类型:gzip(压缩率高,适合高内内存和CPU)/snappy(适合带宽敏感,压缩力度大)/lz4/sztd,可以修改为snappybuffer.memory:生产者最大可用的缓冲,默认是33554432(32M) 三. 数据可靠性
在上面了解kafka生产者流程的时候在构建ClientRequest配置的ACK参数,这个ack的值可以通过参数acks进行配置,默认是1
在介绍ack之前先了解一下kafka的分区/副本和ISR队列。
分区机制是kafka实现高吞吐的关键,副本机制是为了实现高可用。
创建一个主题:test-topic-1,设置分区partitions为3,副本因子replication-factor是2。这里着重说一下副本因子的含义是:将任意一个分区复制到2个broker上,其中一个leader另一个是follower就是这样,说明只有一个副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic-1
查看kafka集群的分区和副本的分配结果:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic-1
结果如下图
从上面的分区和副本,可以知道如下的几点:
当副本因子(replicaation-factor)为N且N大于1时,每个分区都会有N个副本,这些副本由leader和follower组成,并且所有分区的副本都会均匀的分布在所有的节点(Borker)上一个分区的多个副本中只有一个leader和多个follower,生产者往分区中写入数据只会在leader上写入,然后数据才会被复制到其他的followe中。Kafka的follower会周期性的pull(拉取数据,类似与消费者消费过程),数据读取也是发生在leader上,follower只是一个数据备份,保证leader出现故障的做切换,并不往外提供服务。 3.2. ISR队列
从上面知道一个分区包括大于等于1个副本,这些副本中只会有一个leader,其他的都是follower,leader负责读写,follower副本同步leader数据。当生产者发送消息到broker,Kafka会采用leader+follower完全同步机制(ack=-1/all);但此时存在如下问题,例如一个分区有1leader+5follower,此时leader和其中三个follower都同步完全了,其他两个follower同步很慢或者其中有一个挂掉了,会就造成ack一直无法返回。
所以kafka采用不是完全同步,也不是完全异步,而是采用ISR(In Sync Replica),Kafka会维持一个与其保持同步的replica集合,该集合就是ISR队列(这个队列是由leader和于leader保持同步的follower组成的,如下图),每个分区都一个ISR集合,由leader动态维护,当leader宕机之后,新的leader选出也是从ISR队列中选出。
可以通过配置参数:min.insync.replicas设置最小副本数量,默认值是1,这个和ack=1的效果是一样的,仍然有丢数的风险。
如果某一个follower长时间未向leader发送通信请求或者同步数据,这个follower将会被提出ISR队列中。这个时间阈值可以通过参数:replica.lag.time.max.ms设置,默认是30s。
3.3. ACK机制在上面看生产者发送消息的源码中,可以知道在生产者发送消息到Broker的时候,会有传一个ack的参数,这个参数有三种类型的值,下面分别看一下各个值的作用:
ack=0:表示producer无需要等待来自broker的确认就可以发送下一批数据,这种情况传输效率是最高的,但是也是意味着数据的可靠性是最低的。ack=1:表示producer在ISR队列中的leader确认接收到数据之后就可以发送下一批数据了。ack=-1/all:表示producer需要等待ISR中所有follower都确认接受到数据之后才算一次发送完成,可靠性是最高的,但是也无法确保数据不丢失。
下面对ack是1和-1的情况进行详细分析一下,了解下数据丢失的情况:
3.3.1. ACK等于1ACK等于1的情况如下:生产者发送消息到分区的leader,leader写到磁盘,并返回给客户端成功的消息但是此时ISR中的副本还没有来得及来取该消息,如果此时leader宕机,follower就会发生选举,选出新的leader之后,但是由于旧的leader在宕机之前,follower并没有拉取到最新的消息,此时就会造成数据丢失的问题。
ACK设置为-1/all情况如下:生产者发送消息到分区的leader,leader写到磁盘,并且同步到ISR队列中的follower之后,返回ack,此时就算leader宕机,会从ISR队列中选择一个为leader,数据不会丢失,但是可能会出现数据重复。
如果要实现数据完全可靠的条件:
ACK级别设置为-1/all分区副本大于等于2ISR队列配置的应答的最小副本数量大于等于2 3.5. 可靠性总结
针对ack的三种参数设置:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;一般很少用到;acks=1,生产者发送过来数据需要leader应答,可靠性中等,效率也是中等;一般用于传输普通日志,允许个别数据丢失;acks=-1/all,生产者发送过来数据需要leader和ISR队列中的所有follower应答,可靠性高,效率低;一般用于传输重要数据(诸如钱之类重要数据),对可靠性要求很好的场景 四. 数据的重复问题
数据传递的语义:
至少一次(At Least Once):可以保证数据不丢失,但是不能保证数据不重复;ack设置为-1/all,分区副本大于等于2,ISR队列应答的最小副本数据量大于等于2最多一次(At Most Once):可以保证数据不重复,但是不能保证数据不丢失;ack设置为0精确一次(Exactly Once):保证数据不丢失也不重复 4.1. acks=-1/all存在的问题
在上面一个部分了解下数据可靠性问题,针对acks=-1/all的情况虽然可以保证数据可靠性,但是会出现数据重复的问题,下面看一下为什么会出现数据重复的问题?
当消息发送到leader的时候,ISR中的follower进行拉取同步的过程中,如果leader突然宕机,kafka会在ISR队列中选择一个follower成为新的leader,如果此时被选为leader的follower已经同步了一部分数据,此时如果客户端发生重试就会出现数据重复的问题。发生次情况的概率较低,大体流程如下图:
针对这个数据重复的问题,可以通过幂等性和生产者事务去解决。
幂等性就是指生产者无论向Broker发送多少重复数据,Broker都只会持久化一条,保证了不重复。其实就是将去重的操作从下游系统提升到上游Kafka中进行。
Kafka通过会针对
Pid:在每个新的Producer初始化时,会被分配一个唯一的PID,这个PID对客户端使用者是不可见的;Partition:表示分区号;Sequence Number:对于每个PID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值;
但是需要注意重启之后这个PID就会发生变化,Broker就不认识了,这就意味着幂等性是只能在单次会话内实现并且也不能多分区。
开启幂等性将参数:enable.idempotence设置为true,默认是就是true。
4.3. 生产者事务因为上面的幂等性会因为生产者重启发生变化,也是无法避免发生数据重复的问题,如果要实现一条数据都不发生重复那么就要配合生产者事务来实现。
注意:开启事务,必须开启幂等性,在使用事务之前,必须先自定义一个唯一的transactional.id。有了transactional.id,即使客户端挂掉了,他重启也能继续处理未完成的事务。
这里需要注意:__transaction_state使用用来存储日志消息,这个主题默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.id的hashcode值%50,计算出该事务属于哪个分区。该分区Leader副本所在broker节点即为transactional.id对应的Transaction Coordinator节点。
下面看一下怎么使用事务发送消息:
先增加一个:transactional.id的配置
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test_tid");
发送消息的时候开启事务:
producer.initTransactions(); // 初始化事务
producer.beginTransaction(); // 事务开始
try {
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("test-topic-1", "这是第" + i + "条消息");
producer.send(record);
}
// int i = 1/0; 模拟异常的情况
producer.commitTransaction(); // 事务提交
} catch (Exception e) {
producer.abortTransaction(); // 发生异常,事务回滚
}
五. 数据的顺序性
从上面知道kafka的生产者生产的数据就算是有序的,写入对应主题,因为一个主题有多个分区,所有写入数据之后数据是无法保证顺序;另外在上面分析生产者原理的过程中,生产者的中的InFlightRequests会缓冲最大5(max.in.flight.request.per.connection默认值是5)个请求,这就会出现当5个请求中有一个请求发生错误,重试之后就回request请求乱序。
kafka在1.x版本之前保证数据单分区有序,只能设置max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
kafka在1.x及以后版本保证数据单分区有序,分成两种情况:
在没有开启幂等性,设置max.in.flight.requests.per.connection为1在开启幂等性后,设置max.in.flight.requests.per.connection为小于等于5;这是因为kafka服务端会缓冲生产者发来最近的5个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。 六. 整合Springboot
创建Springboot项目增加依赖:
org.springframework.kafka spring-kafka
增加生产者相关的配置:
spring:
kafka:
bootstrap-servers: 192.168.31.32:9091,192.168.31.32:9092,192.168.31.32:9093
producer:
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
发送消息:
@Slf4j
@RestController
@RequestMapping("kafka")
public class HelloController {
private final static String TOPIC_NAME = "test-topic-1";
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, "key-1", "hello world").addCallback(success -> {
log.info("{}-生产者发送消息成功:{}", TOPIC_NAME, success);
}, failure -> {
log.info("{}-生产者发送消息失败:{}", TOPIC_NAME, failure);
});
}
}
这里的send方法有如下几种:



