注:本文基于kafka 2.13编写
1 关于kafkakafka是一个开源的分布式流处理平台,基于zookeeper协调,支持分区、多副本、多订阅者,主要用做日志收集系统或者是消息队列系统。
2 基本概念Broker
Kafka服务器节点称为broker,一个kafka集群包含一个或多个节点Topic
topic类似一个目录,用来存放不同类别的消息,类似数据库的表Partition
topic中的数据被分成一个或多个partition,分布在不同的broker中,这样扩展性就很强,能都同时读写多个partition,从而达到高吞吐。另外单个partition中的数据是有序的,但是不同partition间的数据是没有顺序的Producer
顾名思义,消息的生成者,生成的消息发到broker上,然后会被追加到对应topic的某个partition中Consumer
消息的消费者,从broker中读取对应topic中的消息Consumer group
consumer所属的组
3 安装
3.1 下载kafka安装包
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz tar -xf kafka_2.13-3.1.0.tgz3.2 安装依赖Java 8+
yum install -y java3.3 启动zookeeper和kafka进程
因为只是简单的使用,我就直接使用kafka自带的zookeeper,也可以单独部署zk集群。
进入kafka目录,或者将该目录加入PATH环境变量,然后直接启动zookeeper和kafka进程
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
也可以优雅的写个service服务,
[root@node1 ~]# cat /usr/lib/systemd/system/zk.service [Unit] Description=Zookeeper After=network.target [Service] ExecStart=/home/kafka/kafka_2.13-3.1.0/bin/zookeeper-server-start.sh /home/kafka/kafka_2.13-3.1.0/config/zookeeper.properties ExecStop=/home/kafka/kafka_2.13-3.1.0/bin/zookeeper-server-stop.sh /home/kafka/kafka_2.13-3.1.0/config/zookeeper.properties [Install] WantedBy=multi-user.target
[root@node1 ~]# cat /usr/lib/systemd/system/kafka.service [Unit] Description=Kafka After=network.target zk.service Requires=zk.service [Service] ExecStart=/home/kafka/kafka_2.13-3.1.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.1.0/config/server.properties ExecStop=/home/kafka/kafka_2.13-3.1.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-3.1.0/config/server.properties [Install] WantedBy=multi-user.target4 创建和消费消息
创建消息topic
[root@node1 kafka_2.13-3.1.0]# bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
写入消息
[root@node1 kafka_2.13-3.1.0]# bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 hello world!
消费消息
[root@node1 kafka_2.13-3.1.0]# bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 helo world!5 基本命令
查看topic list
[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-topics.sh --list --bootstrap-server calhost:9092 __consumer_offsets quickstart-events
查看指定topic详情
[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic: quickstart-events TopicId: N2-9eCsZSfGzdOoCnb8nMg PartitionCount: ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
查看consumer group list
[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 console-consumer-83748
查看指定consumer group详情
[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-consumer-groups.sh --describe --group console-consumer-83748 --bootstrap-server localhost:9092 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-83748 quickstart-events 0 - 0 - console-consumer-30784d7c-6fb5-40b5-a028-188fc20227e6 /192.168.0.111 console-consumer6 监控metrics 6.1 安装kafka_exporter
下载最新版本kafka_exporter安装包,
wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.4.2/kafka_exporter-1.4.2.linux-amd64.tar.gz tar -xf kafka_exporter-1.4.2.linux-amd64.tar.gz
同样写个service文件,
[root@node1 ~]# cat /usr/lib/systemd/system/kafka_exporter.service [Unit] Description=Kafka After=network.target kafka.service Requires=kafka.service [Service] ExecStart=/home/kafka/kafka_exporter-1.4.2.linux-amd64/kafka_exporter --kafka.server=localhost:9092 ExecStop=/bin/kill -15 $MAINPID [Install] WantedBy=multi-user.target
然后就能通过http://{node1-ip}:9308/metrics查看导出的metrics,
prometheus部署参考——《Prometheus搭建》
prometheus搭建好后,我们需要做的就是将kafka_exporter这个job加入prometheus的配置中,这样它就能拉取到数据。
scrape_configs:
- job_name: "kafka"
static_configs:
- targets: ["192.168.0.111:9308"]
reload prometheus配置后,就能查询到kafka的相关metrics了
查看消息每秒生产速率(实时性更强)
sum(rate(kafka_topic_partition_current_offset{instance="$instance", topic=~"$topic"}[1m])) by (topic)
查看消息每分钟生产速率(更平均)
sum(delta(kafka_topic_partition_current_offset{instance=~'$instance', topic=~"$topic"}[5m])/5) by (topic)
查看消息每分钟消费速率
sum(delta(kafka_consumergroup_current_offset{instance=~'$instance',topic=~"$topic"}[5m])/5) by (consumergroup, topic)
查看消费组lag
sum(kafka_consumergroup_lag{instance="$instance",topic=~"$topic"}) by (consumergroup, topic)
参考文档:
- https://kafka.apache.org/quickstarthttps://github.com/danielqsj/kafka_exporter



