- 生产者
- 一 消息提供者开发
- 1.1 过程
- 1.2 代码实现
- 1.3 重点配置参数
- 1.4 消息的发送
- 二 原理解析
- 2.1基本知识
- 2.2 拦截器
- 2.2.1 基本结构
- 2.2.2 自定义拦截器
- 2.3 序列化器
- 2.3.1 基本方法
- 2.3.2 自定义序列化器
- 2.4 分区器
- 2.4.1 基本方法
- 2.4.2 自定义分区器
- 2.5 消息累加器
- 2.1 基本知识
- 2.6 Sender线程
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
//分组
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.设置参数
Properties props = new Properties();
//主机
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
//分组
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//ack默认
props.put(ProducerConfig.ACKS_CONFIG,"1");
//缓存区默认大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//拉取数据默认大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//如果数据未满16k,也提交
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
//2.创建⽣产消息的客户端,传⼊参数
Producer producer = new KafkaProducer(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
//4.发送消息,得到消息发送的元数据并输出
Recordmetadata metadata = producer.send(producerRecord).get();
System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
1.3 重点配置参数
-
bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开。
-
key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在,key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名。
-
ack:
- 0 意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息。
- 1意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。
- -1意味着producer得到follwer确认,才发送下一条数据。
- 综合性能与效率来看,kafka默认ack为1
-
buffer-memory:Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。默认32M。
-
batch-size:kafka一次拉取大小,默认16k
-
retries:重试次数
# lead机器 spring.kafka.bootstrap-servers=ip:9093 #########producer############ # ack spring.kafka.producer.acks=1 # 拉取大小 spring.kafka.producer.batch-size=16384 # 重试次数 spring.kafka.producer.retries=10 # 缓冲区大小 spring.kafka.producer.buffer-memory=33554432 # 序列化 spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer1.4 消息的发送
在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
- ProducerRecord方法
//成员变量
private final String topic;//主题
private final Integer partition;//分区
private final Headers headers;//header
private final K key;
private final V value;
private final Long timestamp;//时间
//构造器
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) {
this(topic, partition, null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
- KafkaProducer方法
//构造器 public KafkaProducer(final Mapconfigs) { this(configs, null, null, null, null, null, Time.SYSTEM); } public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { this(configs, keySerializer, valueSerializer, null, null, null, Time.SYSTEM); } public KafkaProducer(Properties properties) { this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM); } public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { this(propsToMap(properties), keySerializer, valueSerializer, null, null, null, Time.SYSTEM); } //发送方法 @Override public Future send(ProducerRecord record) { return send(record, null); } @Override public Future send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
执行完send()方法之后直接调用get()方法,这样可以获取一个Recordmetadata对象,在Recordmetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息。
//4.发送消息,得到消息发送的元数据并输出
Recordmetadata metadata = producer.send(producerRecord).get();
System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset())
二 原理解析
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。
//成员变量 private final String clientId; // Visible for testing final Metrics metrics; //分区器 private final Partitioner partitioner; private final int maxRequestSize; private final long totalMemorySize; private final Producermetadata metadata; //累加器 private final RecordAccumulator accumulator; //Sender线程 private final Sender sender; private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; private final Time time; // 序列化器 private final SerializerkeySerializer; private final Serializer valueSerializer; //配置文件 private final ProducerConfig producerConfig; private final long maxBlockTimeMs; //拦截器 private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; //事务管理器 private final TransactionManager transactionManager;
-
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。
-
在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
-
RecordAccumulator主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
-
RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为 33554432B,即 32MB。
-
如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
-
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。
-
不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。
-
ProducerBatch的大小和batch.size参数也有着密切的关系。
-
在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch。
-
Sender线程从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。
-
在转换成<Node,List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest
-
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String 类型,表示节点的 id 编号),可以限制连接大小。
-
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。优先发送负载最小的,避免因网络拥塞等异常而影响整体的进度。
- 拦截器(Interceptor)是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器
- 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
public Futuresend(ProducerRecord record, Callback callback) { ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
- configure(Map
configs)
该方法在初始化数据的时候被调用,用于获取生产者的配置信息 - onSend(ProducerRecord
)
该方法在消息被序列化之前调用,并传入要发送的消息记录。用户可以在该方法中对消息记录进行任意的修改,包括消息的key和value以及要发送的主题和分区等。
public ProducerRecordonSend(ProducerRecord record) { ProducerRecord interceptRecord = record;//集合 for (ProducerInterceptor interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
-
onAcknowledgement(Recordmetadata metadata, Exception exception)
该方法在发送到服务器的记录已被确认或者记录发送失败时调用(在生产者回调逻辑触发之前),可以在metadata对象中获取消息的主题、分区和偏移量等信息,在exception对象中获取消息的异常信息。@Override public void onAcknowledgement(Recordmetadata metadata, Exception exception) { int partition = metadata.partition(); String topic = metadata.topic(); long offset = metadata.offset(); String str = metadata.toString(); long timestamp = metadata.timestamp(); } -
close()该方法用于关闭拦截器并释放资源。当生产者关闭时将调用该方法。
package com.Interceptor; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor2.3 序列化器 2.3.1 基本方法{ @Override public void configure(Map configs) { System.out.println(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } @Override public ProducerRecord onSend(ProducerRecord record) { System.out.println("TimeInterceptor-------onSend方法被调用"); // 创建一个新的record,把时间戳写到消息体的最前面 ProducerRecord proRecord = new ProducerRecord ( record.topic(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); return proRecord; } @Override public void onAcknowledgement(Recordmetadata recordmetadata, Exception exception) { System.out.println("TimeInterceptor-------onAcknowledgement方法被调用"); } @Override public void close() { System.out.println("TimeInterceptor-------close方法被调用"); } }
- 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
- 除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
}
- 序列话
public class BytesSerializer implements Serializer{ public byte[] serialize(String topic, Bytes data) { if (data == null) return null; return data.get(); } } public class ByteBufferSerializer implements Serializer { public byte[] serialize(String topic, ByteBuffer data) { if (data == null) return null; data.rewind(); if (data.hasArray()) { byte[] arr = data.array(); if (data.arrayOffset() == 0 && arr.length == data.remaining()) { return arr; } } byte[] ret = new byte[data.remaining()]; data.get(ret, 0, ret.length); data.rewind(); return ret; } }
- 基本方法
default void configure(Map configs, boolean isKey) {
// intentionally left blank
}
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
default void close() {
}
2.3.2 自定义序列化器
public class JsonSerializer implements Serializer2.4 分区器 2.4.1 基本方法
- 消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
- 如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
- 基本方法
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
//初始化配置
public void configure(Map configs) {}
//计算分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
//资源释放
public void close() {}
//如果当前粘性分区的批处理已完成,请更改粘性分区。 或者,如果没有确定粘性分区,则设置一个
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
2.4.2 自定义分区器
public class PhonenumPartitioner implements Partitioner{
@Override
public void configure(Map configs) {
// TODO nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 得到 topic 的 partitions 信息
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 模拟某客服
if(key.toString().equals("10000") || key.toString().equals("11111")) {
// 放到最后一个分区中
return numPartitions - 1;
}
String phoneNum = key.toString();
return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
}
@Override
public void close() {
// TODO nothing
}
}
2.5 消息累加器
2.1 基本知识
- 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
- 基本方法
//构造器
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
Metrics metrics,
String metricGrpName,
Time time,
ApiVersions apiVersions,
TransactionManager transactionManager,
BufferPool bufferPool) {}
//追加记录
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
try {
//获取主题的双端队列
Deque dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
}
//获取给定主题分区的双端队列
private final ConcurrentMap> batches;
private Deque getOrCreateDeque(TopicPartition tp) {
Deque d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
//尝试附加到 ProducerBatch。 如果它已满,我们返回 null 并创建一个新批次。 我们还关闭了记录追加的批处理,以释放压缩缓冲区等资源。
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordmetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
- ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
- Sender 从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。
// 构造器
public Sender(LogContext logContext,
KafkaClient client,
Producermetadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
SenderMetricsRegistry metricsRegistry,
Time time,
int requestTimeoutMs,
long retryBackoffMs,
TransactionManager transactionManager,
ApiVersions apiVersions) {}
//唤醒sender线程
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
private final KafkaClient client;
public void wakeup() {
this.client.wakeup();
}
//消息累加器中检查已准备号的数据,转换成set
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry> entry : this.batches.entrySet()) {
Deque deque = entry.getValue();
synchronized (deque) {
// When producing to a large number of partitions, this path is hot and deques are often empty.
// We check whether a batch exists first to avoid the more expensive checks whenever possible.
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
public final static class ReadyCheckResult {
public final Set readyNodes;
public final long nextReadyCheckDelayMs;
public final Set unknownLeaderTopics;
public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, Set unknownLeaderTopics) {
this.readyNodes = readyNodes;
this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
this.unknownLeaderTopics = unknownLeaderTopics;
}
}
//sender线程发送生成者数据
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send(拿出消息累加器的数据)
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//删除我们还没有准备节点
Iterator iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
//创建生成者请求
Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
//发送请求
sendProduceRequests(batches, now);
}
private void sendProduceRequests(Map> collated, long now) {
for (Map.Entry> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
//从给定的记录批次创建生产请求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List batches) {
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
// 我们可以进入NetworkClient类中
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}
//在下面就进入通信部分



