栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka生产者源码解析(一)—,java序列化和反序列化面试题

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka生产者源码解析(一)—,java序列化和反序列化面试题

Time time) {

//创建生产者配置对象

ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,

valueSerializer));

try {

…………………………

//通过反射机制获取到partitioner(分区器)、keySerializer(key序列化器)、valueSerializer(value序列化器)

this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

if (keySerializer == null) {

this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

Serializer.class);

this.keySerializer.configure(config.originals(), true);

} else {

config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

this.keySerializer = keySerializer;

}

if (valueSerializer == null) {

this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

Serializer.class);

this.valueSerializer.configure(config.originals(), false);

} else {

config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

this.valueSerializer = valueSerializer;

}

//获取interceptors拦截器,之后KafkaProducer调用send方法后会用到该拦截器

List> interceptorList = (List) configWithClientId.getConfiguredInstances(

ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);

if (interceptors != null)

this.interceptors = interceptors;

else

this.interceptors = new ProducerInterceptors<>(interceptorList);

//创建RecordAccumulator(消息收集器),之后KafkaProducer调用send方法后会用将消息数据存入其中

this.accumulator = new RecordAccumulator(……);

//创建更新Kafka集群的元数据

List addresses = ClientUtils.parseAndValidateAddresses(

config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),

config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));

if (metadata != null) {

this.metadata = metadata;

} else {

this.metadata = new Producermetadata(retryBackoffMs,

config.getLong(ProducerConfig.metaDATA_MAX_AGE_CONFIG),

logContext,

clusterResourceListeners,

Time.SYSTEM);

this.metadata.bootstrap(addresses, time.milliseconds());

}

//创建Sender线程

this.sender = newSender(logContext, kafkaClient, this.metadata);

String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;

//启动Sender对应的线程

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

this.ioThread.start();

……………………

} catch (Throwable t) {

……………………

}

}

再来看一下是如何创建Sender线程的。

Sender newSender(LogContext logContext, KafkaClient kafkaClient, Producermetadata metadata) {

………………

//创建NetworkClient,这是Kafka网络I/O的核心

KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(………………), ………………);

………………

//返回Sender线程对象

return new Sender(………………);

}

去掉一些非核心代码,发现newSender方法要做的事情其实很简单:创建NetworkClient,这是Kafka网络I/O的核心,在后面发送消息请求时会用到;最后创建Sender对象,Sender实现了Runnable接口,是个线程类。至于Sender线程都做了什么我们现在并不需要太关心,毕竟本文的主角并不是它,我们把它留着在后面的文章中单独分析。

二、send方法探析

==========

构造完KafkaProducer对象之后,接着就会调用它的send方法,所以下面我开始关注send方法。

@Override

public Future send(ProducerRecord record, Callback callback) {

// 拦截器,可在发送消息之前对消息进行拦截修改

ProducerRecord interceptedRecord = this.interceptors.onSend(record);

return doSend(interceptedRecord, callback);

}

可以看到在发送消息之前,我们可以利用之前获取的拦截器对消息进行拦截修改,然后调用了一个doSend方法,该方法将会完成更新kafka集群元数据信息、对Key和Value进行序列化、分区选择、追加消息到RecordAccumulator消息累加器中、唤醒Sender线程的操作。下面将围绕这些内容进行分析。

1、ProducerInterceptors消息拦截器

ProducerInterceptors其实是一个ProducerInterceptor拦截器的集合,它的onSend方法只不过是在循环遍历这些拦截器,并调用每个拦截器的onSend方法,源码如下:

public ProducerRecord onSend(ProducerRecord record) {

ProducerRecord interceptRecord = record;

//循环遍历拦截器

for (ProducerInterceptor interceptor : this.interceptors) {

try {

//调用每个拦截器的onSend方法

interceptRecord = interceptor.onSend(interceptRecord);

} catch (Exception e) {

………………

}

return interceptRecord;

}

ProducerInterceptor是一个接口,所以如果我们需要写自己的拦截逻辑时,只需要去实现这个接口,将自己的拦截逻辑放在onSend方法中即可。

2、Kafka集群元数据信息更新

消息经过拦截修改后进入到doSend方法,若没有指定分区,后面将会使用Cluster信息计算分区号,因此在此之前需要获取最新的Cluster集群信息。下面是doSend方法中涉及到元数据信息更新的代码部分,其余部分省略。

private Future doSend(ProducerRecord record, Callback callback) {

………………

ClusterAndWaitTime clusterAndWaitTime;

try {

//等待元数据更新

clusterAndWaitTime = waitonmetadata(record.topic(), record.partition(), maxBlockTimeMs);

} catch (KafkaException e) {

………………

}

//获取到Cluster集群最新信息

Cluster cluster = clusterAndWaitTime.cluster;

………………

//计算分区号

int partition = partition(record, serializedKey, serializedValue, cluster);

………………

}

进入waitOnmetadata方法源码,可以看到这里的逻辑主要是判断metadata中的元数据信息是否需要更新,当需要更新时,则通过do-while循环进行更新,其中核心部分是通过**metadata.awaitUpdate()**方法阻塞当前线程,等待Sender线程向远程服务器发起元数据更新请求,直到远程服务器返回了新的元数据信息才唤醒当前线程,最终返回最新的cluster元数据信息。

private ClusterAndWaitTime waitonmetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {

//通过metadata获取cluster信息, metadata之前已经在KafkaProducer构造方法中获取到

Cluster cluster = metadata.fetch();

……………………

//将topic加入到metadata中进行维护

metadata.add(topic);

//从cluster信息中获取topic的分区数

Integer partitionsCount = cluster.partitionCountForTopic(topic);

//如果partitionsCount不为空则说明metadata中已经维护了该topic的元数据,并且需要更新的分区号未定义或者在已知的分区范围内

//则直接返回metadata中的cluster信息

if (partitionsCount != null && (partition == null || partition < partitionsCount))

return new ClusterAndWaitTime(cluster, 0);

………………

//如果metadata中没有维护该topic的元数据,或者需要更新的分区号是新的时,则进行metadata的更新。

//do-while循环更新

do {

…………

//将topic加入到metadata中进行维护

metadata.add(topic);

//获取当前元数据版本号

int version = metadata.requestUpdate();

//唤醒sender线程

sender.wakeup();

try {

//阻塞等待元数据更新结束

metadata.awaitUpdate(version, remainingWaitMs);

} catch (TimeoutException ex) {

……………………

}

//拿到更新后的集群信息

cluster = metadata.fetch();

elapsed = time.milliseconds() - begin;

//检测超时时间

if (elapsed >= maxWaitMs) {

……………………

}

……………………

} while (partitionsCount == null || (partition != null && partition >= partitionsCount));

//返回更新后的cluster信息

return new ClusterAndWaitTime(cluster, elapsed);

}

3、Serializer序列化器

Kafka发送的消息是在网络上进行传输,所以,doSend方法还会通过keySerializer和valueSerializer将我们的消息进行序列化。producer端需要序列化,consumer端需要反序列化。下面是doSend方法中涉及到消息序列化的代码部分,其余部分省略。

private Future doSend(ProducerRecord record, Callback callback) {

………………

byte[] serializedKey;

try {

//使用keySerializer将key进行序列化

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

} catch (ClassCastException cce) {

………………

}

byte[] serializedValue;

try {

//使用valueSerializer将value进行序列化

serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

} catch (ClassCastException cce) {

………………

}

………………

}

4、Partitioner分区器

我们

【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

的消息最终都会发往一个合适的分区,如果我们在ProducerRecord消息记录中已经给partition字段指定好了分区号,那么将会优先选择此分区,否则将会通过**partitioner.partition()**方法为我们选择一个合适的分区。下面是doSend方法中涉及到计算分区的代码部分,其余部分省略。

private Future doSend(ProducerRecord record, Callback callback) {

………………

//计算分区

int partition = partition(record, serializedKey, serializedValue, cluster);

………………

}

进入partition方法。

private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

//获得ProducerRecord中的partition字段值

Integer partition = record.partition();

//如果ProducerRecord中partition字段已经设置了分区号,则直接返回该分区号,否则调用分区器进行计算合适的分区号

return partition != null ?

partition :

partitioner.partition(

record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

}

好家伙,里面还有个partition方法,继续进入核心的**partitioner.partition()**方法:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

//从cluster中获取topic的分区信息

List partitions = cluster.partitionsForTopic(topic);

//获得分区数量

int numPartitions = partitions.size();

//如果消息没有key

if (keyBytes == null) {

//递增counter,用于后面取模运算

int nextValue = nextValue(topic);

//选择availablePartitions

List availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) {

int part = Utils.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else {

//返回一个不可用的分区

return Utils.toPositive(nextValue) % numPartitions;

}

} else {

// 如果消息有key的情况

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/489785.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号