1. 生产者消息的发送2. 源码分析
2.1 KafkaProducer 的初始化2.2 KafkaProducer 的消息发送
1. 生产者消息的发送Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中笔者提到Kafka 高性能的重要原因之一就是把负载均衡放在客户端实现,这一点在 Kafka 生产者的设计实现上也有体现。通常一个 Kafka 集群及其生产者的结构关联如下图所示
2. 源码分析 2.1 KafkaProducer 的初始化Producer 生产者发送消息到 Kafka 服务端的大致流程如下:
根据消息数据的 key 进行哈希计算,使用该值与目标 topic 下的分区数取余运算,确定消息应该发往的目标分区确定目标分区后,结合集群元数据确定该分区 Leader 副本所在的 Broker 节点,向其发送Produce 请求
KafkaProducer 初始化的时候会创建许多组件,其中比较关键的如下:
partitioner: 消息分区器,默认实现为 DefaultPartitioner,负责将消息分配到 topic 下的各个分区accumulator: 消息暂存器,负责将消息转化为特定结构存储metadata: Producermetadata 对象,负责存储 Kafka 集群的元数据信息,后续==this.metadata.bootstrap()==调用将根据配置 bootstrap.servers 初始化集群节点。Kafka 客户端运行过程中每次向服务端发起网络请求都会检查是否需要更新集群元数据,这个动作涉及到的组件为 metadataUpdatersender: Sender 对象,调用 KafkaProducer#newSender() 创建,封装了底层网络客户端 NetworkClient,负责网络请求的发起和响应处理ioThread: KafkaThread 线程对象,启动后以 sender 为线程任务
KafkaProducer(ProducerConfig config,
Serializer keySerializer,
Serializer valueSerializer,
Producermetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
try {
this.producerConfig = config;
this.time = time;
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
Map metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
List> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new Producermetadata(retryBackoffMs,
config.getLong(ProducerConfig.metaDATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.metaDATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
KafkaProducer#newSender() 的源码比较简单,可以看到关键的逻辑如下:
首先创建 NetworkClient 对象,有关于这个对象的操作下层网络IO的运行原理,读者可以参考 Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析使用上一步创建的 NetworkClient 对象来创建 Sender 对象
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Producermetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}
KafkaProducer 初始化完毕,则可以调用 KafkaProducer#send() 方法开始进行消息发送,可以看到此处的核心其实是调用 KafkaProducer#doSend() 方法
@Override public Futuresend(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
KafkaProducer#doSend() 方法非常关键,其中核心的流程如下:
首先调用 KafkaProducer#waitonmetadata() 方法确认集群元数据已经就绪,元数据如未就绪则线程在此阻塞等待调用 KafkaProducer#partition() 方法借助消息分区器计算出消息将要存入的 topic 分区调用 RecordAccumulator#append() 方法将消息封装到 ProducerBatch 中,暂存在消息暂存器中如果暂存器中某个分区的 ProducerBatch 中保存的消息数据量达到阈值,调用 Sender#wakeup() 方法唤醒下层网络客户端进行网络IO处理
private FuturedoSend(ProducerRecord record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnmetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnmetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnmetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensurevalidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); // producer callback will make sure to call both 'callback' and interceptor callback if (tp == null) { // set topicPartition to -1 when null tp = ProducerInterceptors.extractTopicPartition(record); } Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); // The onCompletion callback does expect a non-null metadata, but one will be created inside // the interceptor's onCompletion implementation before the user's callback is invoked. interceptCallback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
KafkaProducer#waitonmetadata() 方法的主要功能是确保集群元数据就绪,可以看到关键逻辑如下:
首先调用 metaData#fetch() 方法获取集群信息,然后判断当前发送消息的目标 topic ,如不存在直接抛异常,中断处理判断消息发往的目标分区是否在集群元数据中存在,存在则方法处理结束,否则需要进入下一步更新集群元数据的步骤更新元数据时,首先调用 metaData#requestUpdateForTopic() 方法设置更新标识位,再调用 Sender#wakeup() 方法唤醒下层网络客户端,向 Kafka 服务端发起同步元数据的 metadata 请求调用 metaData#awaitUpdate() 方法休眠等待元数据更新完成
private ClusterAndWaitTime waitOnmetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic, nowMs + elapsed);
int version = metadata.requestUpdateForTopic(topic);
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
回到本节步骤2第2步集群元数据已经就绪,则调用 KafkaProducer#partition() 方法使用消息分区器计算出消息将要存入的 topic 分区,如果partitioner.class配置没有指定分区器,那此处的核心就是调用 DefaultPartitioner#partition() 方法计算出消息的目标分区
private int partition(ProducerRecordrecord, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
DefaultPartitioner#partition() 方法内部实现有两个关键:
如果发送的消息没有指定 key,则调用 StickyPartitionCache#partition() 方法随机分配一个分区如果发送的消息指定了 key,则通过哈希运算取余的方式确定一个分区,这也就是我们常说的把唯一标识相同的消息发往同一个分区,保证这些消息消费有序的实现基础
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
回到本节步骤2第3步确定消息的目标分区之后,继续调用 RecordAccumulator#append() 方法将消息封装到 ProducerBatch 中暂存,可以看到这一步的关键处理如下,暂存器的数据结构如下图所示:
首先调用 RecordAccumulator#getOrCreateDeque() 获取目标 topic 下的 Deque 双端队列,这个队列中的每一个 ProducerBatch 对象都是一个分区的消息暂存集合调用 RecordAccumulator#tryAppend() 方法尝试将消息追加到 ProducerBatch 内部的消息数据集合,如果不能追加则新建一个ProducerBatch 对象,并为其创建一个内部消息数据集合对象 MemoryRecordsBuilder,再调用 ProducerBatch#tryAppend() 方法追加消息数据
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
Deque dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordmetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
消息数据暂存结束,Sender#wakeup() 方法将唤醒下层网络客户端进行网络IO处理,此时会进入 Sender#run() 方法的处理,可以看到此处的核心是调用Sender#runonce() 方法进行处理
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
Sender#runonce() 方法会进行事务消息的处理,此处暂不深入,可以看到关键动作分为两步:
调用 Sender#sendProducerData() 方法构建 Kafka 消息生产的请求,并将请求投入队列中调用 NetworkClient#poll() 方法触发下层网络客户端进行网络 IO 处理
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
// Check whether we need a new producerId. If so, we will enqueue an InitProducerId
// request which will be sent below
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
Sender#sendProducerData() 方法的关键处理如下:
首先调用 RecordAccumulator#ready() 方法检索收集目标 topic 下的各个分区 Leader 副本所在的 Broker 节点,然后调用 NetworkClient#ready() 方法检测出连接未就绪的节点,将其剔除调用 RecordAccumulator#drain() 方法将消息数据按照目标节点分组,该过程中会调用 ProducerBatch#close() 方法将消息数据封装到 MemoryRecords 对象中调用 Sender#sendProduceRequests() 方法遍历分组消息,将其封装在Produce 请求中发往目标节点
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
Iterator iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
accumulator.resetNextBatchExpiryTime();
List expiredInflightBatches = getExpiredInflightBatches(now);
List expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
篇幅所限,将消息数据按照目标节点进行分组的处理不做深入分析,直接看发送消息的 Sender#sendProduceRequests() 方法,可以看到此处的核心是调用 Sender#sendProduceRequest() 方法将消息数据按照目标节点依次发送。Sender#sendProduceRequest() 方法的处理关键如下,至此生产消息的发送基本结束
遍历消息数据列表,调用 ProducerBatch#records() 获取要发送的消息数据,将其封装到 ProduceRequestData.PartitionProduceData 中,最终将消息数据封装到 ProduceRequest.Builder 生产消息请求的构造者内部,这个构造者将构建Produce 请求调用 NetworkClient#newClientRequest() 方法构造下层网络的 ClientRequest对象,该对象将ProduceRequest.Builder封装在内部,并设置 Sender#handleProduceResponse() 方法作为响应调用 NetworkClient#send() 方法将网络客户端请求存入缓冲区,等待 NetworkClient#poll() 方法触发时真正进行网络 IO,不了解这部分流程原理的读者可参考 Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
private void sendProduceRequests(Map> collated, long now) { for (Map.Entry > entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); } private void sendProduceRequest(long now, int destination, short acks, int timeout, List batches) { if (batches.isEmpty()) return; final Map recordsByPartition = new HashMap<>(batches.size()); // find the minimum magic version used when creating the record sets byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection(); for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // down convert if necessary to the minimum magic used. In general, there can be a delay between the time // that the producer starts building the batch and the time that we send the request, and we may have // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use // the new message format, but found that the broker didn't support it, so we need to down-convert on the // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); if (tpData == null) { tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); tpd.add(tpData); } tpData.partitionData().add(new ProduceRequestData.PartitionProduceData() .setIndex(tp.partition()) .setRecords(records)); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, new ProduceRequestData() .setAcks(acks) .setTimeoutMs(timeout) .setTransactionalId(transactionalId) .setTopicData(tpd)); RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }



