主线程中经过操作,在内存中根据分区器生成多个DQuence,然后通过sender线程,等到批次数据达到16K的时候,就会来拉取数据。
或者根据linger.ms的数据,按照时间来拉取。
一般来说在生产环境之中这两个值会进行修改。
接下来DQuence分区的数据要发往不同的kafka节点,以不同的节点为key,后面跟随对应请求,可以发送最多五个没有响应的请求,等到第一个请求得到答复,才会继续发送请求
副本同步机制
ask:0不需要反馈
ask:1发送的数据leader收到就可,即使数据同步没有完成也可以
ask:-1/all,需要leader和ISR队列的所有节点收集数据才能应答
之后清理请求以及数据---失败会进行重试
retries:默认int最大值,直到成功
海量数据按照分区进行分装,合理控制分区任务实现负债均衡
(1)便于合理使用存储资源,每一个partiton在一个Broker上存储,可以将海量数据按分区切割成一块一块的数据存储在多个Broker上。合理控制分区任务,实现负载均衡效果
(2)提供并行度,生产者以分区为单位发送数据;消费者以分区为单位进行消费数据。
分区策略:
当没有指定分区的时候,但是有kv值的时候,根据key的hash值发送到对应分区
因此在生产环境之中可以用表名作为一个key值,这样同表的hashcode值是一致的,就会发送到同一个分区之中;
---
自定义分区器:
业务需求,将含有某个关键词的信息分到指定分区之上
1、实现partitioner重写方法
2、
使用场景:过滤目标信息,过滤脏数据
(三)生产者如何提高吞吐量batch.size:批次大小,默认为16k(时效性)
&&
linger.ms:等待时间,修改为5-100ms(单次信息大小)
compression.type:压缩snappy
RecordAccumulator:缓冲区大小,修改为64M(默认为32M)
应答机制:
ask:1比0可靠一些
当ask为-1的时候的数据可靠问题:
为-1的时候数据是十分可靠的,维护了ISR(和leader保持同步的F+L集合)
如果F30s未同步会被踢出ISR,(time.max.ms设定),当分区副本为1的时候相当于就是ack=1
当ask为-1的时候的数据重复性问题:
当ask信息要返回的时候挂掉了,此时已经完成了数据的同步,但是没有返回,所以下一次信息会继续发送,会导致数据的重复发送到kafka中。
消息的幂等性:
每次kafka重启都会富裕一个新的PID,因此kafka只保证在单分区内的会话是不重复的
(单分区单会话不重复)
一个topic 多个分区
PID一致,Partition一致,seqNum一致,就会在内存中直接干掉==》保证消息的幂等性
enable.idempotence
此时还是有问题,当kafka挂掉之后PID重新分配,那么还是会产生重复数据,此时--》生产者事务
(四)生产者事务生产者事务的开启必须开启幂等性
事务在提交过程中需要持久化到硬盘中,(先储存到主题再储存到磁盘里 )
每一个节点都有一个事务协调器,会根据主题
事务的事务id的值找到50个分区的其中一个分区,这个分区leader副本所在的节点就成为了这个事务所对应的事务协调器节点。
生产者必须有一个唯一的事务ID,事务ID会被持久化到主题里,主题会持久化到硬盘,所以事务ID不会发生改变
多分区的数据有序性
多分区的数据有序性,在消费者处排序,损失效率,获得顺序
InFlightRequests默认每一个broker最多缓存5个请求。
在kafka集群之中(由于开启了幂等性,有用一个单调递增的序列号),
即使发送数据的顺序错误了,也会根据单调递增的值来寻找请求的顺序,以此来保证数据的有序性(1,落盘。2,落盘,4到了不落,5不落,3到了落盘,之后4落5落),如果 InFlightRequests改为了6,那么就不一定能保证有序了,因为在broker中最多允许5个未响应的请求。
Zookeeper


