- Kafka中的ISR、AR又代表什么?
- Kafka中的HW、LEO等分别代表什么?
- Kafka中是怎么体现消息顺序性的?
- Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
- Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
- “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
- 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
- 有哪些情形会造成重复消费?
- 那些情景会造成消息漏消费?
- 当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
- topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
- topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
- Kafka有内部的topic吗?如果有是什么?有什么所用?
- Kafka分区分配的概念?
- 简述Kafka的日志目录结构?
- 如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
- 聊一聊Kafka Controller的作用?
- Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
- 失效副本是指什么?有那些应对措施?
- Kafka的那些设计让它有如此高的性能?
- Kafka架构
- Kafka的机器数量
- 副本数设定
- Kafka压测
- Kafka日志保存时间
- Kafka中数据量计算
- Kafka的硬盘大小
- Kafka监控
- Kakfa分区数
- 多少个Topic
- Kafka的ISR副本同步队列
- Kafka分区分配策略
- Kafka挂掉
- Kafka丢不丢数据
- Kafka数据重复
- Kafka消息数据积压,Kafka消费能力不足怎么处理?
- Kafka参数优化
- Kafka高效读写数据
- Kafka单条日志传输大小
- Kafka过期数据清理
- Kafka可以按照时间消费数据
- Kafka消费者角度考虑是拉取数据还是推送数据
- Kafka中的数据是有序的吗
ISR:与leader保持同步的follower集合
AR:分区的所有副本
LEO:没个副本的最后条消息的offset
HW:一个分区中所有副本最小的offset
每个分区内,每条消息都有一个offset,故只能保证分区内有序。
Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?拦截器 -> 序列化器 -> 分区器
Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么? “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?正确
消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?offset+1
有哪些情形会造成重复消费?先提交消费,后提交offset,有可能造成数据的重复
那些情景会造成消息漏消费?先提交offset,后消费,有可能造成数据漏消费
当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?- 会在 zookeeper 中的 /brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first
- 触发 Controller 的监听程序
- kafka Controller 负责 topic 的创建工作,并更新 metadata cache
可以增加
bin/kafka-topics.sh --bootstrap-server cpucode100:9091 --alter --topic topic-config --partitions 3topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以减少,现有的分区数据难以处理。
Kafka有内部的topic吗?如果有是什么?有什么所用?__consumer_offsets, 保存消费者offset
Kafka分区分配的概念?一个topic多个分区,一个消费者组多个消费者,故需要将分区分配个消费者(roundrobin、range)
简述Kafka的日志目录结构?每个分区对应一个文件夹,文件夹的命名为 topic-0,topic-1,内部为 .log 和 .index 文件
如果我指定了一个offset,Kafka Controller怎么查找到对应的消息? 聊一聊Kafka Controller的作用?负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?partition leader(ISR),controller(先到先得)
失效副本是指什么?有那些应对措施?不能及时与 leader 同步,暂时踢出 ISR,等其追上 leader 之后再重新加入
Kafka的那些设计让它有如此高的性能?分区,顺序写磁盘,0-copy
Kafka架构生产者、Broker、消费者、ZK;
注意:Zookeeper 中保存 Broker id 和消费者 offsets 等信息,但是没有生产者信息。
Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100)+ 1
副本数设定一般我们设置成 2个 或3个,很多企业设置为 2个
副本的优势:提高可靠性;副本劣势:增加了网络IO传输
Kafka压测Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)
Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈
默认保存7天;生产环境建议3天
Kafka中数据量计算每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:50条
高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条
每条日志大小:0.5k - 2k(取1k)
每秒多少数据量:2.0M - 20MB
每天的数据量100g * 2个副本 * 3天 / 70%
Kafka监控公司自己开发的监控器
开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle
Kakfa分区数1)创建一个只有1个分区的topic
2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
https://blog.csdn.net/weixin_42641909/article/details/89294698
分区数一般设置为:3-10个
多少个Topic通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的
Kafka的ISR副本同步队列ISR(In-Sync Replicas),副本同步队列
ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。
有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中
Kafka分区分配策略在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6 分区
- C2-1 将消费 7, 8, 9 分区
第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
Kafka挂掉- Flume记录
- 日志有记录
- 短期没事
- Ack = 0,相当于异步发送,消息发送完毕即offset增加,继续生产
- Ack = 1,leader收到leader replica 对一个消息的接受ack才增加 offset,然后继续生产
- Ack = -1,leader收到所有replica 对一个消息的接受ack才增加 offset,然后继续生产
幂等性 + ack-1 + 事务
Kafka数据重复,可以再下一级:SparkStreaming、redis或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值
Kafka消息数据积压,Kafka消费能力不足怎么处理?-
如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
-
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。
- Broker参数配置(server.properties)
- 日志保留策略配置
保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
- Replica相关配置
default.replication.factor:1 默认副本1个
- 网络通信延时
#当集群之间网络不稳定时,调大该参数 replica.socket.timeout.ms:30000 # 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数 replica.lag.time.max.ms= 600000
- Producer优化(producer.properties)
#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力 compression.type:none gzip snappy lz4
- Kafka内存调整(kafka-server-start.sh)
# 默认内存1个G,生产环境尽量不要超过6个G export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"Kafka高效读写数据
-
Kafka本身是分布式集群,同时采用分区技术,并发度高
-
顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s -
零复制技术
Kafka 对于消息体的大小默认为单条最大值是 1M , 但是在我们应用场景中,常常会出现一条消息大于1M,如果不对 Kafka 进行配置。则会出现生产者无法将消息推送到 Kafka 或 消费者无法去消费 Kafka 里面的数据
这时我们就要对 Kafka 进行以下配置:server.properties
# broker可复制的消息的最大字节数, 默认为1M replica.fetch.max.bytes: 1048576 # kafka 会接收单个消息size的最大限制, 默认为1M左右 message.max.bytes: 1000012
注意:message.max.bytes 必须 <= replica.fetch.max.bytes ,否则就会导致 replica 之间数据同步失败。
Kafka过期数据清理保证数据没有被引用(没人消费他)
日志清理保存的策略只有 delete 和 compact 两种
- log.cleanup.policy = delete 启用删除策略
- log.cleanup.policy = compact 启用压缩策略
https://www.jianshu.com/p/fa6adeae8eb5
Kafka可以按照时间消费数据MapKafka消费者角度考虑是拉取数据还是推送数据startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);
拉取数据
Kafka中的数据是有序的吗- 单分区内有序
- 多分区,分区与分区间无序
扩展:
kafka producer 发送消息的时候,可以指定key
这个key的作用是为消息选择存储分区,key可以为空,当指定key且不为空的时候,Kafka 是根据 key 的 hash 值与分区数取模来决定数据存储到那个分区。
有序解决方案:同一张表的数据 放到 同一个 分区
- ProducerRecord里传入 key,会根据 key 取 hash 算出分区号
- key 使用表名,如果有库名,拼接上库名



