是分布式的基于发布/订阅模式的消息中间件,主要应用在大数据领域。
kafka主要还是主动拉取数据
1.2 消息队列的两种模式1)点对点( 一对一 消费者主动拉取数据 消息收到后消息清除 )
消息生产者发送消息到队列 ,消费者从队列中获取消息。 消息被消费后 队列中不在有存储 所以消息消费者不可能消费到已经被消费的消息
2)发布/订阅模式 (一对多 消费者消费数据后不会清除数据 支持主动拉取数据和推送 两种消费方式 )
消息生产者将消息发送到topic中,同时又多个消息消费者消费该消息。和点对点不同,发送到topic的消息会被所有订阅者消费
1.3 kafka中的角色(1) product : 消息提供者
(2) consumer : 消息消费者
(3) consumer group : 消费者组 这是kafka消息队列特有的角色,它是一堆消费者 组合成的。消费者组中的每个消费者负责消费不同分区的数据,一个分区只能由一个消费者组中的一个消费者来消费,它们互不影响。
(4) topic:可以看做是一个队列。
(5) partition : 分区 一个topic 可以分为多个partition, topic 消息保存在各个partition上
2.1 kafka 工作流程及文件存储kafka中消息存储是存储在文件中、 kafka存储文件分为 .log 和 .index
.log 文件:存储的消费者发送过来的元数据
.index :存储的是消息在 .log 文件中的偏移量
通俗点将 : log文件存储的是实际的数据、index文件左边存储的是消息的偏移量(也就是第几条消息)、而index文件右边存储的是这条消息存储在log文件中的起始位置、根据这个位置就可以找到实际数据。
2.2 kafka 生产者 2.2.1 分区策略在kafka提供的API中,生存者发送消息时需要传递三个参数:
(1)String value 要发送的消息。
(2)String key kafka自定义的分区。
(3)Integer partition 用户指定的分区。
kafaka的分区原则就和这三个参数有关,大致如下:
(1)如果传递时指定了partition的值,那么kafka会往指定的分区发送数据。
(2)如果没有指定partition的值,但是传递了key,kafka会自动生成一个partition值,它是将key的hash值与topic的partition值进行取余,得到的值就是kafka生成的partition的值。
(3)如果partition和key都没有进行传递,kafka则会随机生成一个key,是一个整数,以后每次生成都是在这个整数上加一。
2.2.2 数据可靠性保证1)消费者消息可靠性保证
为了保证product(发送者) 发送的消息能可靠的发送到指定的topic、topic的每个partition(分区) 收到product 发送的消息后 都需要向product 发送ack,如果product收到ack ,就会进行下一轮的发送 否则重新发送消息。
**在kafka中向生产者发送ack的方案有两种**
方案一、当partition收到消息超过半数以上,就发送ack
优点:延迟低
缺点:选举新的 leader 时,容忍n台节点的故障,需要2n+1 个副本
方案二、只有当partition全部同步完成,才发送ack
优点:选举新的 leader 时,容忍n台节点的故障,需要n+1 个副本
缺点:延迟高
在保证消息可靠性 kafka选择了方案二、但是方案二存在当我在同步数据时 分区挂掉一个 这个时候永远不可能发送ack,为了解决这个问题kafka采用了ISR.
2) ISR(Kafka 动态维护一组同步副本 (ISR))
采用第二种方案之后,设想一下情景:leader收到数据,所有follower 都开始同步数据,但是有一个follower,因为故障不能与leader同步 那leader就要一直等待下去。
leader 维护了一个动态的同步副本 意为和leader保持同步的follower集合。当ISR中的follower完成数据同步之后,leader 就会给消息发送者发送ack。如果follower长时间未向leader 同步数据 则该follower将被踢出ISR 该时间阈值由 replica.lag.time.max.ms 参数设定。leader发生故障之后 就会从ISR中选举新的leader
3) ack应答机制
对于一些 不太重要的数据 对数据的可靠性要求不是很高,所以没必要等到ISR中的follower全部接收成功。
所以kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡。
acks :
0 : peoduct 不等待broker的ack ,broker一接收到还没有写入磁盘就已经返回,当broker故障时可能丢失数据。
1 : product 等待broker的ack,只等待leader落盘成功后返回ack 如何在follower同步之前leader故障,会丢失数据。
-1 : 等待leader 和 follower (ISR中的follower) 全部同步完成后才返回ack。可能会造成数据重复
比如:生产者发生消息、消息到达leader 并且follower也同步完成、但是还没发送ack,这时候leader故障了,这时候从follower中选举一个leader,因为生产者没有接收到ack,把消息重新又重新发送了一次,这时候数据就会重复。
HW : 高水位 指的是消费者能见的最大的offset,ISR队列中最小的LEO
LEO : 每个副本的最后一个offset
(1) follower 故障
follower 发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以从新加入ISR。
(2) leader 故障
leader故障之后,会从ISR中选出一个新的leader,之后为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或不重复
2.3 kafka 消费者 2.3.1 消费方式 消费者采用拉取(pull) 的方式拉取数据消费,pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据,针对这点,kafka消费者在消费数据是会传入一个时长参数timeout,如果当前没有数据可以提供消费,消费者会等待一段时间在返回,这段时间即为timeout。
2.3.2 消费者 分区分配策略(1) Range 范围分配策略
range 范围分配策略是kafka默认的分配策略,它可以确每个消费者的消费分区数量是均衡的。注意: range范围分配策略是针对每个topic的。
算法公式:
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n + 1个,剩余消费者消费n个
举个例子:n = 2 , m = 2 、8个分区、有3个消费者。 这时候分配就是 前两个消费者消费3个分区,最后一个消费者消费2个分区
(2) RoundRobin 轮询策略
轮询策略是将消费者组内的消费者以及消费者所订阅的所有topic和partition按照字典序排序(topic和分区的hashcode 进行排序),然后通过轮询的方式逐个将分区一次分配给每个消费者。(如果消费者出现故障、所有分区会进行从新分配消费者)
(3) Stricky 粘性分配策略 (主要目的:分区分配尽量均匀、在发生rebalance的时候、分区的分配尽可能与上次分配保持相同。)
没有发生rebalance时,Stricky粘性分配策略和RoundRobin轮询策略类似
Stricky粘性分配策略不会像轮询策略把所有分区进行重新分配,会保留rebalance之前的结果。只是将出现故障的消费者消费的分区进行重新分配。以此来减小系统资源的浪费。
2.3.3 消费者offset存储kafka在0.9版本之前,消费者offset存储在zk中,路径是 /consumers/[groupId]/offsets/[topic]/[partition]
0.9版本之后将offset放在kafka内置的一个topic中,该topic是__consumer_offsets,消费该topic需要修改consumer.properties文件
exclude.internal.topics=false
也是根据[groupId,topic,partition]来划分的
[root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group --topic stable-test [2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner test-consumer-group stable-test 0 601808 601808 0 none test-consumer-group stable-test 1 602826 602828 2 none test-consumer-group stable-test 2 602136 602136 0 none
- Group : 消费者组
- Topic : topic的名字
- Pid : partition的ID
- Offset : kafka消费者在对应分区上已经消费的消息数【位置】
- logSize : 已经写到该分区的消息数【位置】
- Lag : 还有多少消息未读取(Lag = logSize - Offset)
- Owner : 分区创建在哪个broker
offset更新的方式,不区分是用的哪种api,大致分为两类:
- 自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
- 手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。
如图所示:当消息生产者发送消息到broker、并且也写入了磁盘,因为某些原因,broker在返回ack时出现故障,这时生产者在次发送消息到broker,这就会造成数据重复。
幂等:
kafka为了实现幂等性 引入了produce ID(pid) 和 Sequence Number。
PID : 每个produce在初始化时,都会分配一个唯一的pid,这个pid 对用户来说,是透明的。
Sequence Number :针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
具体做法如下:
生产者把消息发送到broker,并且携带 pid 和 seq,把消息和pid seq 保存在分区中,当生产者没有收到broker发来的ack时,在次发送消息,同样还是会把pid 和 seq发送到broker,消息在保存到分区之前会去磁盘上比较有没有跟当前消息一样的pid和seq,如果有、就把当前消息直接丢弃,反之 保存。
事务:



