- kafka
- 一、Kafka原始安装
- 1.1 使用docker安装zk
- 1.2 安装kafka
- 1.2.1 上传压缩包到centos
- 1.2.2 解压压缩文件
- 1.2.3 修改配置文件
- 1.2.3 启动kafka
- 1.3 测试发送和接收消息
- 1.3.1 发送消息
- 1.3.2 接收消息
- 1.4 kafka-eagle 监控平台
- 1.4.1 解压文件
- 1.4.2 配置eagle环境变量
- 1.4.3 修改system-config.properties
- 1.4.4 启动kafka-eagle
- 二、kafka基础概念
- 2.1 Broker
- 2.2 Topic
- 2.3 Pratition
- 2.4 offset
- 2.4 kafka中消息⽇志⽂件中保存的内容
消息对列解决的是服务与服务之间的通信问题,从而达到解耦的目的提高吞吐量。
kafka消息是存储在日志文件中
一、Kafka原始安装安装要求: 需要安装jdk
版本:kafka:kafka_2.11-2.4.1.tgz 、zk(使用docker安装,zk 版本:3.4.9)
zk链接工具:ZooInspector
$ docker search zookeeper # 查看一下镜像 $ docker pull zookeeper:3.4.9 # 拉取指定版本zk镜像 $ docker images # 查看image ID $ mkdir -p /docker/zk/data $ docker run -d -p 2181:2181 -v /docker/zk/data:/data/ --name zookeeper --privileged 3b83d9104a4c # 最后跟着 image ID1.2 安装kafka 1.2.1 上传压缩包到centos 1.2.2 解压压缩文件
$ tar -zxvf kafka_2.11-2.4.1.tgz1.2.3 修改配置文件
修改配置文件:/kafka2.11-2.4/config/server.properties
#broker.id属性在kafka集群中必须要是唯一 broker.id=0 #kafka部署的机器ip和提供服务的端⼝号 listeners=PLAINTEXT://192.168.3.47:9092 #kafka的消息存储⽂件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=192.168.3.47:2181
1.2.3 启动kafka搭建kafak集群,主要修改broker.id的值,和修改 端口号即可
$ cd bin # 进入到bin目录 $ ./kafka-server-start.sh -daemon ../config/server.properties
使用zk链接工具查看broker信息
1.3 测试发送和接收消息 1.3.1 发送消息kafka-console-producer.sh这个命令在bin目录下
./kafka-console-producer.sh --broker-list 192.168.3.47:9092 --topic testtopic1.3.2 接收消息
- 从最后一条消息的偏移量+1开始消费
$ ./kafka-console-consumer.sh --bootstrap-server 192.168.3.47:9092 --topic testtopic
- 从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.3.47:9092 --from-beginning --topic testtopic1.4 kafka-eagle 监控平台
kafka-eagle-bin-1.2.4.tar.gz
1.4.1 解压文件$ tar -zxvf kafka-eagle-bin-1.2.4.tar.gz # 解压后得到下面的web压缩包,需要再进行解压 $ tar -zxvf kafka-eagle-web-1.2.4-bin.tar.gz1.4.2 配置eagle环境变量
依赖jdk,需要先安装jdk
$ vim /etc/profile export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4 export PATH=$PATH:$KE_HOME/bin:$JAVA_HOME/bin
$ source /etc/profile1.4.3 修改system-config.properties
/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4/conf/system-config.properties
###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 #cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 cluster1.zk.list=192.168.3.47:2181 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=25 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka cluster2.kafka.eagle.offset.storage=zk ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=true ###################################### # alarm email configure ###################################### kafka.eagle.mail.enable=true kafka.eagle.mail.sa=alert_sa kafka.eagle.mail.username=alert_sa@163.com kafka.eagle.mail.password=mqslimczkdqabbbh kafka.eagle.mail.server.host=smtp.163.com kafka.eagle.mail.server.port=25 ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### kafka.eagle.sasl.enable=false kafka.eagle.sasl.protocol=SASL_PLAINTEXT kafka.eagle.sasl.mechanism=PLAIN ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://192.168.3.47:3306/kafka_eagle?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&allowMultiQueries=true&autoReconnect=true kafka.eagle.username=root kafka.eagle.password=root1.4.4 启动kafka-eagle
/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4/bin
$ chmod 777 ke.sh $ ./ke.sh start
访问: 192.168.3.47:8048/ke
默认账号密码: admin/123456
二、kafka基础概念 2.1 Broker处理消息的节点,是一个逻辑概念,可以当成kafka服务器。
2.2 Topictopic可以实现消息的分类,不同消费者订阅不同的topic;
主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被
订阅该topic的消费者消费
2.3 Pratitionpratition分区。
⼀个主题中的消息量是⾮常⼤的,因此可以通过分区的设置,来分布式存储这些消息
分区可以解决问题:
-
分区存储,可以解决统⼀存储⽂件过⼤的问题
-
提供了读写的吞吐量:读和写可以同时在多个分区中进⾏
2.4 offset每个区里面有事分段存储。segment
偏移量(消息存放的位置)
2.4 kafka中消息⽇志⽂件中保存的内容-
在server.properti中配置的日志保存路径中 00000.log: 这个⽂件中保存的就是消息
-
__consumer_offsets-49:
kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区。这个主题⽤来存放消费
者消费某个主题的偏移量。因为每个消费者都会⾃⼰维护着消费的主题的偏移量,也就是
说每个消费者会把消费的主题的偏移量⾃主上报给kafka中的默认主题:
consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。
-
提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets
主题的分区数
-
提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前
offset的值
-
-
⽂件中保存的消息,默认保存7天。七天到后消息会被删除。



