栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka源码解析与实战(kafka producer配置)

kafka源码解析与实战(kafka producer配置)

一、为什么需要消息系统
1.解耦:
  允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2.冗余:
  消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性:
  因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4.灵活性 & 峰值处理能力:
  在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.可恢复性:
  系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6.顺序保证:
  在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7.缓冲:
  有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8.异步通信:
  很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
二、结构 2.1 拓扑结构

三、Producer端

Kafka 存储的消息来自任意多被称为 Producer 生产者的进程。数据从而可以被发布到不同的 Topic 主题下的不同 Partition 分区。在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为 Consumer 消费者的进程可以从分区订阅消息。Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。

Producer端流程图

3.1 生产端流程 3.1 初始化KafkaProducer 3.1.1、初始化clientid 、transactional.id

组成幂等,保证消息发送的唯一性

client.id
transactional.id
String clientId = config.getString("client.id");
            if (clientId.length() <= 0) {
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            }
String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;

3.1.2、创建一个ProducerMertrict监控,进行指标的统计
metrics.num.samples:维护用于计算度量metrics的样本数量。
metrics.sample.window.ms:计算度量样本的时间窗口,度量用于kafka监控
public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
        Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
        produceThrottleTimeSensor.add(metrics.produceThrottleTimeAvg, new Avg());
        produceThrottleTimeSensor.add(metrics.produceThrottleTimeMax, new Max());
        return produceThrottleTimeSensor;
    }
3.1.3、构造了一个分区器partitioner 3.1.4、初始化内存缓冲RecordAccumulator

核心参数:

max.request.size:单条记录最大;默认值为1048576B,1MB
retry.backoff.ms	重试间隔时间
buffer.memory 缓冲区的大小
enable.idempotence:是否开启幂等
compression.type:NONE,GZIP,SNAPPY,LZ4
batch.size:每批次发送的最大batchsize默认16k
linger.ms:最大等待时间
3.1.5、初始化一个metadata组件

第一次初始化的时候不会去集群拉取元数据

bootstrap.servers:集群地址
metadata.max.age.ms:元数据最大生命周期(过期时间)5min
max.block.ms:最大阻塞时间
if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new metadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), true, true, clusterResourceListeners);
                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), this.time.milliseconds());
            }
初始化一个metadata组件,构建一个metadata,用来从broker集群拉取元数据  获取topic的leader + follower,ISR,通过metadata知道有哪些partitions,然后找到对应的partition的leader,每隔一段时间刷新元数据,直接调用metadata组件的方法去broker上面拉了一次集群的元数据,后面默认5分钟拉一次,但是在发送消息的时候,如果没有找到某个topic的元数据时,也会主动去拉一次
3.1.6、初始化一个Sender(Runnable KafkaThread)
security.protocol:安全协议
sasl.mechanism:认证方式
String ioThreadName = "kafka-producer-network-thread | " + clientId;

3.1.7、将上述配置整合初始化一个Sender(Runnable KafkaThread)

用于从缓冲区将数据处理成batch发送到broker

总结
1、初始化构建唯一ID
2、初始化监控项
3、初始化Partitioner
4、初始化内存缓冲RecordAccumulator
5、初始化一个metadata组件,构建一个metadata,用来从broker集群拉取元数据  获取topic的leader + follower,ISR,通过metadata知道有哪些partitions,然后找到对应的partition的leader,每隔一段时间刷新元数据,直接调用metadata组件的方法去broker上面拉了一次集群的元数据,后面默认5分钟拉一次,但是在发送消息的时候,如果没有找到某个topic的元数据时,也会主动去拉一次
6、创建一个消息channel

2、
3、初始化一些核心参数,	缓冲区内存大小(32m),每个请求的最大大小(1mb),重试时间间隔(100ms),
4、初始化的时候,
5、核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲9分钟,每个链接最多有几个request没收到响应(5个),重试连接的时间间隔,scoket发送缓冲区大小(128kb),接收缓冲区大小
6、Sender线程:负责从缓冲区里获取消息进行batch的分配发送到broker
3.2、客户端发送信息 3.2.1、ProducerInterceptor拦截器
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会
对消息做一些定制化需求,比如 修改消息等。同时,producer 允许用户指定多个 interceptor
按序作用于同一条消息从而形成一个拦截链(interceptor chain)
onsend:在消息序列化之前执行用户定制的逻辑方法
3.2.2、waitonmetadata 等待元数据拉取
如果之前从来没有加载过topic的元数据,就会在这一步同步阻塞来等待去broker拉取元数据,如果因为网络问题或者一些异常导致在一段时间还是拉取不到,就会报错
在你把数据放到内存缓冲的时候,如果内存缓冲区满了,此时最多就只能阻塞这么长时间就必须返回了,如果你希望send()方法被阻塞的时间延长,可以手动配置(max.block.ms)

在客户端的方法尝试等待获取topic元数据的过程中,核心逻辑,必须先唤醒sender线程,然后通过一个while循环,直接去wait释放锁,尝试最多就是等待默认60s。

private KafkaProducer.ClusterAndWaitTime waitonmetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        Cluster cluster = this.metadata.fetch();
        if (cluster.invalidTopics().contains(topic)) {
            throw new InvalidTopicException(topic);
        } else {
            this.metadata.add(topic);
            Integer partitionsCount = cluster.partitionCountForTopic(topic);
            if (partitionsCount != null && (partition == null || partition < partitionsCount)) {
                return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);
            } else {
                long begin = this.time.milliseconds();
                long remainingWaitMs = maxWaitMs;

                long elapsed;
                do {
                    do {
                        if (partition != null) {
                            this.log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
                        } else {
                            this.log.trace("Requesting metadata update for topic {}.", topic);
                        }

                        this.metadata.add(topic);
                        int version = this.metadata.requestUpdate();
                        this.sender.wakeup();

                        try {
                            this.metadata.awaitUpdate(version, remainingWaitMs);
                        } catch (TimeoutException var15) {
                            throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
                        }

                        cluster = this.metadata.fetch();
                        elapsed = this.time.milliseconds() - begin;
                        if (elapsed >= maxWaitMs) {
                            throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
                        }

                        this.metadata.maybeThrowException();
                        remainingWaitMs = maxWaitMs - elapsed;
                        partitionsCount = cluster.partitionCountForTopic(topic);
                    } while(partitionsCount == null);
                } while(partition != null && partition >= partitionsCount);

                return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);
            }
        }
    }
3.2.3 序列化Key和Value
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());

3.2.4 确定发送的分区

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其计算机制为:

生产者分区策略是 决定生产者将消息发送到哪个分区的算法,
主要有以下几种:

轮询策略:Round-robin 策略,即顺序分配,
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)
随机策略: Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
消息键保序策略:key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个
Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。
1. 指定了 patition,则直接使用;
2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = counter.getAndIncrement();
        List availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return DefaultPartitioner.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
3.2.5、将消息放入RecordAccumulator缓冲区中
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);

3.2.6、Sender线程开始处理缓冲区中的数据

唤醒Sender线程的selector.select()的阻塞,开始处理内存缓冲器中的数据。

如果batch满了或者时间到了就将sender唤醒 然后推送
if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
private void sendProduceRequests(Map> collated, long now) {
        for (Map.Entry> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }
accumulator:消息发送规则
max.request.size:请求最大的大小
acks:默认1,
	参数0:发出去就不管了,
	参数1:leader写完就不管了
	参数all:leader写完,超过1个follower  replica完成
retries:重试次数,默认0
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/772188.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号