栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka简单上手

kafka简单上手

注:本文基于kafka 2.13编写

1 关于kafka

kafka是一个开源的分布式流处理平台,基于zookeeper协调,支持分区、多副本、多订阅者,主要用做日志收集系统或者是消息队列系统。

2 基本概念

Broker
Kafka服务器节点称为broker,一个kafka集群包含一个或多个节点Topic
topic类似一个目录,用来存放不同类别的消息,类似数据库的表Partition
topic中的数据被分成一个或多个partition,分布在不同的broker中,这样扩展性就很强,能都同时读写多个partition,从而达到高吞吐。另外单个partition中的数据是有序的,但是不同partition间的数据是没有顺序的Producer
顾名思义,消息的生成者,生成的消息发到broker上,然后会被追加到对应topic的某个partition中Consumer
消息的消费者,从broker中读取对应topic中的消息Consumer group
consumer所属的组 3 安装 3.1 下载kafka安装包

wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar -xf kafka_2.13-3.1.0.tgz
3.2 安装依赖Java 8+
yum install -y java
3.3 启动zookeeper和kafka进程

因为只是简单的使用,我就直接使用kafka自带的zookeeper,也可以单独部署zk集群。
进入kafka目录,或者将该目录加入PATH环境变量,然后直接启动zookeeper和kafka进程

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

也可以优雅的写个service服务,

[root@node1 ~]# cat /usr/lib/systemd/system/zk.service
[Unit]
Description=Zookeeper
After=network.target

[Service]
ExecStart=/home/kafka/kafka_2.13-3.1.0/bin/zookeeper-server-start.sh /home/kafka/kafka_2.13-3.1.0/config/zookeeper.properties
ExecStop=/home/kafka/kafka_2.13-3.1.0/bin/zookeeper-server-stop.sh /home/kafka/kafka_2.13-3.1.0/config/zookeeper.properties

[Install]
WantedBy=multi-user.target
[root@node1 ~]# cat /usr/lib/systemd/system/kafka.service
[Unit]
Description=Kafka
After=network.target zk.service
Requires=zk.service

[Service]
ExecStart=/home/kafka/kafka_2.13-3.1.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.1.0/config/server.properties
ExecStop=/home/kafka/kafka_2.13-3.1.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-3.1.0/config/server.properties

[Install]
WantedBy=multi-user.target
4 创建和消费消息

创建消息topic

[root@node1 kafka_2.13-3.1.0]# bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

写入消息

[root@node1 kafka_2.13-3.1.0]# bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
hello world!

消费消息

[root@node1 kafka_2.13-3.1.0]# bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
helo world!
5 基本命令

查看topic list

[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-topics.sh --list --bootstrap-server calhost:9092
__consumer_offsets
quickstart-events

查看指定topic详情

[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events	TopicId: N2-9eCsZSfGzdOoCnb8nMg	PartitionCount: ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: quickstart-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

查看consumer group list

[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
console-consumer-83748

查看指定consumer group详情

[root@node1 kafka_2.13-3.1.0]# ./bin/kafka-consumer-groups.sh --describe --group console-consumer-83748 --bootstrap-server localhost:9092

GROUP                  TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-83748 quickstart-events 0          -               0               -               console-consumer-30784d7c-6fb5-40b5-a028-188fc20227e6 /192.168.0.111  console-consumer
6 监控metrics 6.1 安装kafka_exporter

下载最新版本kafka_exporter安装包,

wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.4.2/kafka_exporter-1.4.2.linux-amd64.tar.gz
tar -xf kafka_exporter-1.4.2.linux-amd64.tar.gz

同样写个service文件,

[root@node1 ~]# cat /usr/lib/systemd/system/kafka_exporter.service
[Unit]
Description=Kafka
After=network.target kafka.service
Requires=kafka.service

[Service]
ExecStart=/home/kafka/kafka_exporter-1.4.2.linux-amd64/kafka_exporter --kafka.server=localhost:9092
ExecStop=/bin/kill -15 $MAINPID

[Install]
WantedBy=multi-user.target

然后就能通过http://{node1-ip}:9308/metrics查看导出的metrics,

6.2 接入prometheus

prometheus部署参考——《Prometheus搭建》

prometheus搭建好后,我们需要做的就是将kafka_exporter这个job加入prometheus的配置中,这样它就能拉取到数据。

scrape_configs:
  - job_name: "kafka"
    static_configs:
      - targets: ["192.168.0.111:9308"]

reload prometheus配置后,就能查询到kafka的相关metrics了

查看消息每秒生产速率(实时性更强)

sum(rate(kafka_topic_partition_current_offset{instance="$instance", topic=~"$topic"}[1m])) by (topic)

查看消息每分钟生产速率(更平均)

sum(delta(kafka_topic_partition_current_offset{instance=~'$instance', topic=~"$topic"}[5m])/5) by (topic)

查看消息每分钟消费速率

sum(delta(kafka_consumergroup_current_offset{instance=~'$instance',topic=~"$topic"}[5m])/5) by (consumergroup, topic)

查看消费组lag

sum(kafka_consumergroup_lag{instance="$instance",topic=~"$topic"}) by (consumergroup, topic) 

参考文档:

    https://kafka.apache.org/quickstarthttps://github.com/danielqsj/kafka_exporter
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734158.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号