Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于 大数据实时处理领域。
kafka同步处理和异步处理的模型:
2.使用消息队列的优点
异步处理的优点:
1.解耦
允许独立的修改写入mq和页面响应注册的代码
2.可恢复性
系统不会因为一部分的组件失效而是全部的系统功能都受到影响,及时系统挂掉,消息队列也可以处理数据
3.缓冲
可处理生产消息和消费消息的速度不一致的场景
4.灵活性 & 峰值处理能力
在访问量剧增的情况下,消息队列可以顶住突发的访问压力,不会使得访问突增使系统崩溃。
5.异步通信
3.消息队列发布、订阅的两种方式 (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式
(一对多,消费者消费数据之后不会清除消息) 消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
可以细分为消费者主动拉取和队列主动推送消息
kafka用的是前者
主动推的方式可能会因为消费者的处理能力不同而造成消费者崩溃或者资源浪费
消费者拉的方式需要维护一个长轮询,这个轮询在没有消息的时候会浪费一定的资源
4.kafka架构模型在kafka集群中维护了若干台kafka的主机(brokers)每台brokers中会有不同的主题,用于对不同的消息进行分类
生产者生产消息,会推入不同的brokers中对应的topic内,注意不是传给某一台机器,每台机器中都会有相应的topic
另外对于一台机的同一个topic,其有多个分区(partition),但这些分区中只有一个可以作为Leader,其余的豆浆作为follower
注意follower和Leader针对的是同一台机,集群中是可以有多个Leader的,但是每个topic在一台机上只有一个leader
leader挂掉后,由follow中选取一台作为新的leader
同一个消费者组里面的消费者不可以消费同一个分区
不同消费者组里面的消费者可以消费同一个分区
消费者组的存在可以提高消息的消费能力
zookeeper可以监控kafka集群中主机的注册情况
0.9版本及以前消息的偏移量存储在kafka,0.9以后存储在kafka
偏移量可用于确认消费者的消费位置,用于满足分布式环境下多个消费者消费消息队列的需求
5.kafka的常用命令1.查看当前服务器中的所有topic
bin/kafka-topics.sh --zookeeper 主机名称:(zookeeper端口号)2181 --list
2.创建topic
bin/kafka-topics.sh --zookeeper 主机名称:2181 --create --replication-factor 3 --partitions 1 -- topic first
指令解析:
-- zookeeper 主机名称+端口号 制定相应主机上的zookeeper监测kafka集群
-- create 表示创建命令
-- replication-factor 表示定义的副本数,即在集群中的所有主机中,对于该topic一共保存多少个副本
-- partitions 表示分区数
-- topic 表示topic的名称
3. 查看日志
cd logs --> vim server.log
4. 删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
5. 发送消息
bin/kafka-console-producer.sh --broker -list 主机名称:端口号 --topic first
指令解析:
-producer.sh 表示是一个生产者进程
--broker -list 主机名:端口号 表示你要把消息打到kafka集群中的哪一台机器
-- topic first 表示你要把消息打入到这台机器的名为first的topic中
之后出现小箭头>
在里面输入你的消息
6. 消费消息
bin/kafka-console-consumer.sh --zookeeper 主机名:2181 --topic first
--zookeeper 通过主机名和端口号制定对应的zookeeper,需要与前面创建topic first时的zookeeper的信息相同
-- topic first 表示消费名称为first的消息队列
可加 --from-beginning 表示把topic中的全部消息都读出来
7. 查看某个topic的详情
bin/kafka-topics.sh --zookeeper 主机名:2181 --describe --topic first
8. 修改分区数
bin/kafka-topics.sh --zookeeper 主机名:2181 --alter --topic first --partitions 6
6.数据日志分离cd /opt/moudles/kafka/config
vi server.properties
将log.dirs进行修改,改为自己设置的专门用于存储数据的文件目录
log目录kafka会自己创建,此时数据和日志分离
7.kafka工作流程Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。 topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
消息只能保证在每个分区内是有序的,并不能保证在全局是有序的。
为防止log过大导致数据定位效率低,kafka采用分片和索引的机制,将每个分区分为多个segment,每个segment对应一个.index文件 .index文件存放 偏移量 - 起始地址的映射,每一个偏移量对应的分片大小恒定,因此就医定位到每个message的位置。
8.kafka分区原则(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后 面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
9.kafka数据可靠性保证为了保证生产者生产的消息被topic接收,每当topic收到一条生产者的消息后,必须回复一个ack,生产者在收到ack后可以发送下一条消息,否则需要重新发送消息
那么何时回复ACK消息,只有当follow完成与leader的同步时,才可以发送ack,这样即使leader挂掉,也可以选举出新的follower作为leader。
1.副本数据同步策略1)半数以上完成同步,就发送ACK:只要有超过一半的follower完成了与leader的同步,就进行ack的发送。
优点:延迟较低 缺点:选举新的leader时,为了容忍n台节点的故障,需要有2n+1个副本
2)全部完成同步,才发送ACK:全部的follower均需要完成与leader的同步、
优点:选举新的leader时,容忍n台节点的故障,需要n+1个副本 缺点:延迟高
在第2)种同步策略下,一旦有一台Leader挂掉导致一直不能完成同步,那么会影响工作效率。
因此Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集 合。当 ISR 中的 follower 完成数据的同步之后,leader就会给 follower 发送 ack。如果 follower 长时间未向leader同步数据 ,则该follower将被踢出ISR ,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。被移出去的主机,在其同步消息的条数和交互时间达到加入ISR的要求后会重新将其拉入ISR。
2.ack应答机制0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:producer 等待 broker 的 ack,partition的leader落盘成功后返回 ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer等待broker的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
3.高水位和终止偏移量终止偏移量LEO指的是每个分区同步的数据的最大的偏移量
高水位指的是每个分区确定都能同步到的偏移量,防止因数据不同步而报错
10.消费者部分 10-1 消费方式:
kafka采用消费者主动拉取数据的模式,但在这个模式下,如果kafka中没有数据,那么消费者将一直循环轮询,会一直返回空数据,因此消费者在消费消息时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间再返回,这个等待的时间就是timeout。
由于一个消费者组中有多个消费者,一个主题又有多个分区,每个分区不可以被同一个消费者组里的多个消费者消费,因此分区与消费者之间的分配需要配置一定的策略。
kafka有两种分配策略,一是RoundRobin,二是Range。
1)RoundRobin:所有主题中的所有分区作为一个整体,每个分区会计算出一个hash值进行排序,然后消费者组里的每个消费者轮询这个整体,选取一个或多个分区进行消费。 缺点:一个消费者有可能轮询到它没有订阅的分区。
2)Range:
kafka默认的使用的分区策略。将所有的主题的所有的分区作为一个整体,按一定的范围进行划分这个整体,即按照消费者的个数划分n个不同的范围,每个范围中的分区分配给对应的消费者来进行消费。
10-2消费者offset的维护:
group + topic + partition 唯一确定一个offset
读取offset的指令:
0.11.0.0版本之前:
bin/kafka-console-consumer.sh --topic __consumer_offsets -- zookeeper 主机名:2181 --formatter "kafka.coordinator.GroupmetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0版本及以后:
bin/kafka-console-consumer.sh --topic __consumer_offsets -- zookeeper 主机名:2181 --formatter "kafka.coordinator.group.GroupmetadataManager$OffsetsMessageForm atter" --consumer.config config/consumer.properties --frombeginning



