栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka源码学习之--生产者原理

kafka源码学习之--生产者原理

1.生产者发送消息

1.1 找到kafka生产者入口–这里通过KafkaTemplate发送消息

点击send()方法,进入doSend()

ProducerRecord主要是创建要发送到 Kafka 的记录,topic需要指定,其他参数可自配

KafkaTemplate通过工厂模式创建kafkaProducer

找到ProducerFactory的实现类进入DefaultKafkaProducerFactory的createProducer方法

找到创建kafkaProducer的位置

在kafkaProducer里面,创建kafkaProduce实例,并且创建了一个sender对象,启动了一个io线程
这个io线程就是kafka发送线程,生产者发送消息时,这个线程会被唤醒

kafkaProducer创建完成之后,在回到消息发送的位置

调用send()方法发送消息

在kafkaProducer sender发送线程被唤醒之前,kafkaProducer的main线程会进行几个逻辑处理

1.拦截器逻辑


走代码中可以看出,生产者可以实现你多个拦截器,形成一个拦截链

发送消息测试

2.用指定的序列化器分别对key,value进行序列化

3.指定分区器

kafka的消息会发送到partiton中,但是具体到哪个分区器,可以自己指定,没有指定的话,kafka已经实现自动分配,分4中情况
1.在指定partition情况下,直接将指定的partition作为partiton值

2.没有指定partiton但是自定义了分区器


3.没有指定partiton但是存在key的情况下,会使用默认的分区器DefaultPartitioner

4.在既没有指定partiton也没有指定key的情况下,第一次调用时,会随机生成一个整数(之后每次调用在这个基础上自增)然后将这个值对当前topic可用的partiton取余(轮询round-robin)

4.消息累加器

分区选择完成后,并没有立即将消息发送出去,而是把消息放进了一个累加器(ConcurrentMap)缓存起来,什么时候发送跟batch-size有关


kafka尽可能的保证消息先往一个分区里发送,当这个分区的batch-size已经不足以在继续追加存放消息,kafka会考虑更换一个pattition分区继续往该分区的的累加器里累加消息,如果再次追加失败,这个时候kafka会新建batch,不会在尝试了,新建与否通过这个参数控制abortForNewBatch

最后唤醒sender线程发送消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632743.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号