- 一、安装
- 二、相关命令
- 三、偏移量(offset)和顺序消费
- 四、单播和多播消息
- (1)单播消息
- (2)多播消息
- (3)简易消费模型流程
- 五、消费组概念
- 六、kafka的分区(partitions)
- (1)给主题创建多个分区
- (2)__consumer_offsets 文件存储的内容
- 七、搭建kafka集群
- 七、集群消费消息
- (1)分区和副本
- (2)集群发送消费消息
本次安装环境为macos,安装为单机版本(非集群),安装方式为brew
-
安装
brew install kafka
-
修改配置文件
vi /usr/local/etc/kafka/server.properties,将listeners释放,并且添加host
-
启动zookeeper
brew services start zookeeper -
启动kafka
brew services start kafka -
测试是否启动成功
- 创建名为test的topic:kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 查看当前的topic:kafka-topics --list --zookeeper localhost:2181
- 创建生产者:kafka-console-producer --broker-list localhost:9092 --topic test
- 创建消费者:kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
- 删除topic:kafka-topics --delete --topic test --zookeeper localhost:2181
kafka目录:/usr/local/etc/kafka
kafka配置:server.properties
kafka备份存储位置:log.dirs标签后配置
- 查看topic描述:
kafka-topics --zookeeper localhost:2181 --topic test --describe # 查看描述
-
创建topic:
kafka-topics --create --zookeeper localhost:2181 --topic kafka_topic --replication-factor 1 --partitions 1 # 创建几个副本、指定分区数 -
消费者:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning # 7天内的第一条消息开始消费
进入 server.properties 中 log.dirs 存储目录
进入 test-0 目录
- 偏移量(offset)
1)偏移量就是记录消息的下标 - 顺序消费
2)生产者发送消息时,保证消费的顺序消费(先发先接受),使用offset偏移量描述消息的顺序。
3)消费者消费消息时,通过 offset偏移量 来描述需要消费到那条消息的位置。
简易的消费模型如下:
在同一个kafka的一个tiopc中,两个消费者,一个生产者。
两个消费者在同一个消费组中(–consumer-property group.id=group1),只会有一个消费者可以订阅到topic中的消息。
在同一个kafka的一个tiopc中,两个消费者,一个生产者。
不同的消费组订阅同一个topic,这样多个消费组中的多个消费者都可以收到订阅消息.
五、消费组概念一个消费组中只会有一个消费者订阅一个topic的消息
kafka-consumer-groups --bootstrap-server localhost:9092 --group group2 --describe
kafka-topics --create --zookeeper localhost:2181 --topic test_partit --replication-factor 1 # 一个副本 --partitions 2 # 两个分区
进入目录:/usr/local/var/lib/kafka-logs,可见主题:test_partit有两个分区(test_partit-0、test_partit-1)
- __consumer_offsets:kafka自己创建的__consumer_offsets主题包含50个分区(可在配置文件中设置分区数),这个主题用来记录消费者消费某个主题的偏移量。
七、搭建kafka集群
- 用处:消费者会定期将自己消费的分区的offset提交给kafka自身的topic:__consumer_offsets中
- 存储:保存的key: groupid+topic+分区号,val: 当前的offset
- 分区规则: hash(groupid) % __consumer_offsets数量
/usr/local/etc/kafka
(1) 准备三个 server.properties 修改内容
-
server.properties
broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/usr/local/var/lib/kafka-logs
-
server1.properties
broker.id=1 listeners=PLAINTEXT://localhost:9093 log.dirs=/usr/local/var/lib/kafka-logs-1
-
server2.properties
broker.id=2 listeners=PLAINTEXT://localhost:9094 log.dirs=/usr/local/var/lib/kafka-logs-2
(2) 启动三个kafka服务
- kafka-server-start -daemon server.properties
- kafka-server-start -daemon server1.properties
- kafka-server-start -daemon server2.properties
验证是否启动成功:
- ps -ef | grep 9094 查看 端口9094 进程是否存在
- 进入 /usr/local/var/lib 查看 日志文件 夹是否存在
kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 # 3个副本 --partitions 2 # 2个分区 --topic test_factor
主题信息:
集群副本和分区数据同步图
- leader
kafka的读和写都发生在leader上,可以理解为主副本文件,由leader负责同步给各个子副本。leader挂了会重新选举新的节点中的副本为主节点。 - isr
可以同步的和以同步的节点号会被存到isr中。当某个节点性能太差,将会从isr中删除。
生产者:kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test_factor
消费者:
# 消费组1
kafka-console-consumer --bootstrap-server localhost:9092,localhost:9093,localhost:9094
--from-beginning
--consumer-property group.id=group1
--topic test_factor
# 消费组2
kafka-console-consumer --bootstrap-server localhost:9092,localhost:9093,localhost:9094
--from-beginning
--consumer-property group.id=group2
--topic test_factor
集群消费分区:
- 一个分区只能被一个消费组中的一个消费者消费:目的是为了保证消费顺序性,该顺序只是一个消费者消费对应分区的顺序性,多个分区总消费顺序无法保证
- 分区数量决定消费组中:消费者数量最多等于分区数量,多的消费者保持空闲



