栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka之消息生成者基本知识

kafka之消息生成者基本知识

文章目录
  • 生产者
    • 一 消息提供者开发
      • 1.1 过程
      • 1.2 代码实现
      • 1.3 重点配置参数
      • 1.4 消息的发送
    • 二 原理解析
      • 2.1基本知识
      • 2.2 拦截器
        • 2.2.1 基本结构
        • 2.2.2 自定义拦截器
      • 2.3 序列化器
        • 2.3.1 基本方法
        • 2.3.2 自定义序列化器
      • 2.4 分区器
        • 2.4.1 基本方法
        • 2.4.2 自定义分区器
      • 2.5 消息累加器
        • 2.1 基本知识
      • 2.6 Sender线程

生产者 一 消息提供者开发 1.1 过程

1.2 代码实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    //分组
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.设置参数
        Properties props = new Properties();
        //主机
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
        //分组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //ack默认
        props.put(ProducerConfig.ACKS_CONFIG,"1");
        //缓存区默认大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //拉取数据默认大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //如果数据未满16k,也提交
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);
        //2.创建⽣产消息的客户端,传⼊参数
        Producer producer = new KafkaProducer(props);
        //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
        //4.发送消息,得到消息发送的元数据并输出
        Recordmetadata metadata = producer.send(producerRecord).get();
        System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }
}
1.3 重点配置参数
  • bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开。

  • key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在,key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名。

  • ack:

    • 0 意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息。
    • 1意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。
    • -1意味着producer得到follwer确认,才发送下一条数据。
    • 综合性能与效率来看,kafka默认ack为1
  • buffer-memory:Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。默认32M。

  • batch-size:kafka一次拉取大小,默认16k

  • retries:重试次数

# lead机器
spring.kafka.bootstrap-servers=ip:9093
#########producer############
# ack
spring.kafka.producer.acks=1
# 拉取大小
spring.kafka.producer.batch-size=16384
# 重试次数
spring.kafka.producer.retries=10
# 缓冲区大小
spring.kafka.producer.buffer-memory=33554432
# 序列化
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
1.4 消息的发送

在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。

		 //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME,"message","四川");
  • ProducerRecord方法
	//成员变量
	private final String topic;//主题
    private final Integer partition;//分区
    private final Headers headers;//header
    private final K key;
    private final V value;
    private final Long timestamp;//时间

//构造器
 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); } public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, null); } public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers) { this(topic, partition, null, key, value, headers); } public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); } public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); } public ProducerRecord(String topic, V value) { this(topic, null, null, null, value, null); }
  • KafkaProducer方法
//构造器
public KafkaProducer(final Map configs) {
        this(configs, null, null, null, null, null, Time.SYSTEM);
    }
    
public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) {
        this(configs, keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
    }

public KafkaProducer(Properties properties) {
        this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
    }


public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) {
        this(propsToMap(properties), keySerializer, valueSerializer, null, null, null,
                Time.SYSTEM);
    }

//发送方法

@Override
    public Future send(ProducerRecord record) {
        return send(record, null);
    }


 @Override
    public Future send(ProducerRecord record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

执行完send()方法之后直接调用get()方法,这样可以获取一个Recordmetadata对象,在Recordmetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息。

 //4.发送消息,得到消息发送的元数据并输出
        Recordmetadata metadata = producer.send(producerRecord).get();
        System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset())
二 原理解析

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。

2.1基本知识
//成员变量
private final String clientId;
// Visible for testing
final Metrics metrics;
//分区器
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final Producermetadata metadata;
//累加器
private final RecordAccumulator accumulator;
//Sender线程
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
// 序列化器
private final Serializer keySerializer;
private final Serializer valueSerializer;
//配置文件
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
//拦截器
private final ProducerInterceptors interceptors;
private final ApiVersions apiVersions;
//事务管理器
private final TransactionManager transactionManager;
  • 整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。

  • 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

  • RecordAccumulator主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

  • RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为 33554432B,即 32MB。

  • 如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

  • 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。

  • 不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。

  • ProducerBatch的大小和batch.size参数也有着密切的关系。

  • 在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch。

  • Sender线程从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。

  • 在转换成<Node,List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest

  • 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String 类型,表示节点的 id 编号),可以限制连接大小。

  • InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。优先发送负载最小的,避免因网络拥塞等异常而影响整体的进度。

2.2 拦截器 2.2.1 基本结构
  • 拦截器(Interceptor)是早在Kafka 0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器
  • 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
public Future send(ProducerRecord record, Callback callback) {
ProducerRecord interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
    }

  • configure(Map configs)
    该方法在初始化数据的时候被调用,用于获取生产者的配置信息
  • onSend(ProducerRecord)
    该方法在消息被序列化之前调用,并传入要发送的消息记录。用户可以在该方法中对消息记录进行任意的修改,包括消息的key和value以及要发送的主题和分区等。
public ProducerRecord onSend(ProducerRecord record) {
        ProducerRecord interceptRecord = record;//集合
        for (ProducerInterceptor interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  • onAcknowledgement(Recordmetadata metadata, Exception exception)
    该方法在发送到服务器的记录已被确认或者记录发送失败时调用(在生产者回调逻辑触发之前),可以在metadata对象中获取消息的主题、分区和偏移量等信息,在exception对象中获取消息的异常信息。

    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
        int partition = metadata.partition();
        String topic = metadata.topic();
        long offset = metadata.offset();
        String str = metadata.toString();
        long timestamp = metadata.timestamp();
    }
    
  • close()该方法用于关闭拦截器并释放资源。当生产者关闭时将调用该方法。

2.2.2 自定义拦截器
package com.Interceptor;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

import java.util.Map;


public class TimeInterceptor implements ProducerInterceptor {

    
    @Override
    public void configure(Map configs) {
        System.out.println(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
    }

    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        System.out.println("TimeInterceptor-------onSend方法被调用");
        // 创建一个新的record,把时间戳写到消息体的最前面
        ProducerRecord proRecord = new ProducerRecord(
                record.topic(), record.key(), System.currentTimeMillis() + "," + record.value().toString());
        return proRecord;
    }

    
    @Override
    public void onAcknowledgement(Recordmetadata recordmetadata, Exception exception) {
        System.out.println("TimeInterceptor-------onAcknowledgement方法被调用");
    }

    
    @Override
    public void close() {
        System.out.println("TimeInterceptor-------close方法被调用");
    }

}
2.3 序列化器 2.3.1 基本方法
  • 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
  • 除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型
byte[] serializedKey;
try {
   serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    }

  • 序列话
public class BytesSerializer implements Serializer {
    public byte[] serialize(String topic, Bytes data) {
        if (data == null)
            return null;

        return data.get();
    }
}


public class ByteBufferSerializer implements Serializer {
    public byte[] serialize(String topic, ByteBuffer data) {
        if (data == null)
            return null;

        data.rewind();

        if (data.hasArray()) {
            byte[] arr = data.array();
            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
                return arr;
            }
        }

        byte[] ret = new byte[data.remaining()];
        data.get(ret, 0, ret.length);
        data.rewind();
        return ret;
    }
}
  • 基本方法
 	
    default void configure(Map configs, boolean isKey) {
        // intentionally left blank
    }

    
    byte[] serialize(String topic, T data);

    
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    
    @Override
    default void close() {
       
    }
2.3.2 自定义序列化器
public class JsonSerializer implements Serializer {

    @Override
    public byte[] serialize(String s, Object object) {
        return JSON.toJSONBytes(object);
    }
}
 
2.4 分区器 
2.4.1 基本方法 
  • 消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
  • 如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
 			int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

  • 基本方法
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    //初始化配置
    public void configure(Map configs) {}

    //计算分区
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    //资源释放
    public void close() {}
   
    //如果当前粘性分区的批处理已完成,请更改粘性分区。 或者,如果没有确定粘性分区,则设置一个
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

2.4.2 自定义分区器
public class PhonenumPartitioner implements Partitioner{
    
    @Override
    public void configure(Map configs) {
        // TODO nothing
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 得到 topic 的 partitions 信息
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // 模拟某客服
        if(key.toString().equals("10000") || key.toString().equals("11111")) {
            // 放到最后一个分区中
            return numPartitions - 1;
        }
        String phoneNum = key.toString();
        return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
    }

    @Override
    public void close() {
        // TODO nothing
    }

}
2.5 消息累加器 2.1 基本知识
  • 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
     RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true);
            
  • 基本方法
//构造器
public RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             int lingerMs,
                             long retryBackoffMs,
                             int deliveryTimeoutMs,
                             Metrics metrics,
                             String metricGrpName,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager,
                             BufferPool bufferPool) {}

//追加记录
public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch) throws InterruptedException {
    
    try {
            //获取主题的双端队列
            Deque dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, 				callback, dq);
                if (appendResult != null)
                    return appendResult;
            }
}
 
 //获取给定主题分区的双端队列   
 private final ConcurrentMap> batches;
 private Deque getOrCreateDeque(TopicPartition tp) {
        Deque d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }   
    
    
 //尝试附加到 ProducerBatch。 如果它已满,我们返回 null 并创建一个新批次。 我们还关闭了记录追加的批处理,以释放压缩缓冲区等资源。   
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque deque) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordmetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
        }
        return null;
    }    
    
  • ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
2.6 Sender线程
  • Sender 从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。
// 构造器
public Sender(LogContext logContext,
                  KafkaClient client,
                  Producermetadata metadata,
                  RecordAccumulator accumulator,
                  boolean guaranteeMessageOrder,
                  int maxRequestSize,
                  short acks,
                  int retries,
                  SenderMetricsRegistry metricsRegistry,
                  Time time,
                  int requestTimeoutMs,
                  long retryBackoffMs,
                  TransactionManager transactionManager,
                  ApiVersions apiVersions) {}
//唤醒sender线程
if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }



private final KafkaClient client;

public void wakeup() {
        this.client.wakeup();
    }

//消息累加器中检查已准备号的数据,转换成set
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
   Set readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set unknownLeaderTopics = new HashSet<>();

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry> entry : this.batches.entrySet()) {
            Deque deque = entry.getValue();
            synchronized (deque) {
                // When producing to a large number of partitions, this path is hot and deques are often empty.
                // We check whether a batch exists first to avoid the more expensive checks whenever possible.
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    TopicPartition part = entry.getKey();
                    Node leader = cluster.leaderFor(part);
                    if (leader == null) {
                        // This is a partition for which leader is not known, but messages are available to send.
                        // Note that entries are currently not removed from batches when deque is empty.
                        unknownLeaderTopics.add(part.topic());
                    } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);  
}



    public final static class ReadyCheckResult {
        public final Set readyNodes;
        public final long nextReadyCheckDelayMs;
        public final Set unknownLeaderTopics;

        public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, Set unknownLeaderTopics) {
            this.readyNodes = readyNodes;
            this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
            this.unknownLeaderTopics = unknownLeaderTopics;
        }
    }


//sender线程发送生成者数据
private long sendProducerData(long now) {
   Cluster cluster = metadata.fetch();
   // get the list of partitions with data ready to send(拿出消息累加器的数据)
   RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); 
    
    //删除我们还没有准备节点
   Iterator iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
    
    
    //创建生成者请求	
     Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    
    
    
     if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
    	//发送请求
        sendProduceRequests(batches, now);
      
  }


	
    private void sendProduceRequests(Map> collated, long now) {
        for (Map.Entry> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }

	//从给定的记录批次创建生产请求
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List batches) {
         String nodeId = Integer.toString(destination);
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
        
    }


// 我们可以进入NetworkClient类中
   private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
  Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);     
   }
//在下面就进入通信部分
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号