这里用的kafka是二进制安装包
[root@oracle ~]# tar xf kafka_2.12-2.8.1.tgz -C /usr/local/
[root@oracle ~]# cd /usr/local/
[root@oracle local]# mv kafka_2.12-2.8.1/ kafka
[root@oracle ~]# cd /usr/local/kafka/config
[root@oracle config]# vim server.properties
log.dirs=/usr/local/kafka/logs/
#修改本机的监听IP地址,只修改端口和IP地址,其它格式都是固定的
listeners=PLAINTEXT://192.168.1.103:9092
#配置zooker连接地址
zookeeper.connect=localhost:2181
#启动kafka [root@oracle ~]# cd /usr/local/kafka/bin/ [root@oracle bin]# ./kafka-server-start.sh -daemon ../config/server.properties [root@oracle bin]# netstat -anpt |grep 9092 |grep -i listen tcp6 0 0 192.168.1.188:9092 :::* LISTEN 3135/java二、kafka的基本概念
| 名称 | 解释 |
|---|---|
| Broker | 相当于LB,一个kafka节点就一个broker,一个或者多个broker可以组成一个kafka集群 |
| Topic | kafka根据topic对消息进行分类,发布到kakfa集群的每条消息都需要指定一个topic |
| Producer | 消息生产者,向broker发送消息的客户端 |
| Consumer | 消息消费者,从broker读取消息的客户端 |
| Consumer Group | 每一个consumer属于一个特定的Consumer Group 一条消息可以被多个不同的consumer group消费,但是一个consumer group中只能有一个consumer能够消费该消息 |
| Partition | 一个topick可以分为多个partition 每个partition内部消息是有序的 |
[root@oracle bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 --replication-factor 1 --partitions 1 --topic test #显示如下 Created topic test.2.查看有哪些topic
[root@oracle bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 #结果如下:显示了刚才创建的test test3.发送消息(生产消息)
[root@oracle bin]# ./kafka-console-producer.sh --broker-list 192.168.1.188:9092 --topic test >abc >123 >hello4.接收消息(消费消息) 4.1 方式1:
从最后一条消息的偏移量+1开始消费
[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --topic test4.2 方式2
从头开始消费
[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --from-beginning --topic test4.3 注意:
消息会被存储, 会存储到消息的日志文件中 消息是顺序存储 消息是有偏移量的 消费是可以指明偏移量进行消费的四、kafka的单播和多播 1.单播消息
一个 “消费组” 里只会有一个消费者能消费到某一个topic中的消息。于是可以创建多个消费者。这些消费者在同一个消费组中
在两个终端中分别启动一个消费者,会发现只有一个消费者能收到消息 ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property group.id=testgroup --topic test2.多播消息
在一些业务场景中需要让一条消费者被多个消费者消费,那么就可以使用多播模式了,
kafka实现多播 只需要让不同的消费者处于不同的消费组即可
./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property group.id=testgroup1 --topic test ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property group.id=testgroup2 --topic test五、消费组 1.查看所有消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092 --list #显示结果如下 testgroup testgroup1 解释: --bootstrap-server 指定的是broker的地址,也就是kafka的地址2.显示某一个消费组的信息
./kafka-consumer-groups.sh --bootstrap-server 10.130.222.13:9092 --group etooth --describe 解释: --group:指定消费组 --describe:显示详细信息 显示结果解释: current-offset: 当前消费到第几条消息 log-end-offset: 消息总量条数 lag: 还有多少条消息没有被消费六、分区
topic(主题)和partition(分区)的区别
通过partition对一个topic中的消息分区来存储。这样的有点是:
1.分区存储,可以解决统一存储文件过大的问题
2.提供了读写的吞吐量,读取和写入可以同时在多个分区中进行
./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 --replication-factor 1 --partitions 3 --topic test 解释: --partitions 3 这个参数就是指定在创建topic时要创建几个分区,这里就是创建了3个分区,索引从0开始,保存消息的日志也会分为3个七、kafka集群
这里为了方便就使用1台机器
[root@oracle config]# cp server.properties server1.properties [root@oracle config]# cp server.properties server2.properties
[root@oracle config]# cat server.properties broker.id=0 listeners=PLAINTEXT://192.168.1.188:9092 log.dirs=/tmp/kafka-logs
[root@oracle config]# cat server1.properties broker.id=1 listeners=PLAINTEXT://192.168.1.188:9093 log.dirs=/tmp/kafka-logs1
[root@oracle config]# cat server2.properties broker.id=2 listeners=PLAINTEXT://192.168.1.188:9094 log.dirs=/tmp/kafka-logs2
启动另外两个节点
./kafka-server-start.sh -daemon ../config/server1.properties ./kafka-server-start.sh -daemon ../config/server2.properties [root@oracle bin]# netstat -antp |egrep "9093|9094" tcp6 0 0 192.168.1.188:9093 :::* LISTEN 7363/java tcp6 0 0 192.168.1.188:9094 :::* LISTEN 7771/java1.副本
副本主要是给分区进行备份
在集群中创建一个topic
创建了zhangsan-test得topic 3个副本 2 [root@oracle bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 --replication-factor 3 --partitions 2 --topic zhangsan-test 显示结果如下: Created topic zhangsan-test.
显示topic详细信息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xElrtZey-1636375772607)(1kakfa%E7%9A%84%E5%AE%89%E8%A3%85.assets/image-20211107190516450.png)]
partition: 分区 这里得值说明这是这个topic得哪个分区 leader: 领导者,这里得值说明kafka得哪个节点是leader. leader得作用负责消息得读写,然后再将数据同步给其它得节点.消息得生产和消费都会对leader进行。 lsr:可以同步得broker节点和已同步得broker节点,存放在isr结合中2.集群消息的消费和发送 2.1.向集群发送消息
[root@oracle bin]# ./kafka-console-producer.sh --broker-list 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test >1 >2 >32.2.消费集群消息
[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test --from-beginning #消费到消息 1 2 32.3.不同得消费组消费集群消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test --from-beginning -consumer-property group.id=testgroup ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test --from-beginning -consumer-property group.id=testgroup1
一个分区只能被一个消费组中得某一个消费者消费
3.查看集群内所有的消费组[root@oracle bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --list4.查看某一个消费组的信息
[root@oracle bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --group testgroup --describe



