栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka 学习笔记【持续更新】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka 学习笔记【持续更新】

文章目录
  • kafka
  • 一、Kafka原始安装
    • 1.1 使用docker安装zk
    • 1.2 安装kafka
      • 1.2.1 上传压缩包到centos
      • 1.2.2 解压压缩文件
      • 1.2.3 修改配置文件
      • 1.2.3 启动kafka
    • 1.3 测试发送和接收消息
      • 1.3.1 发送消息
      • 1.3.2 接收消息
    • 1.4 kafka-eagle 监控平台
      • 1.4.1 解压文件
      • 1.4.2 配置eagle环境变量
      • 1.4.3 修改system-config.properties
      • 1.4.4 启动kafka-eagle
  • 二、kafka基础概念
    • 2.1 Broker
    • 2.2 Topic
    • 2.3 Pratition
    • 2.4 offset
    • 2.4 kafka中消息⽇志⽂件中保存的内容

kafka

消息对列解决的是服务与服务之间的通信问题,从而达到解耦的目的提高吞吐量。

kafka消息是存储在日志文件中

一、Kafka原始安装

安装要求: 需要安装jdk

版本:kafka:kafka_2.11-2.4.1.tgz 、zk(使用docker安装,zk 版本:3.4.9)
zk链接工具:ZooInspector

1.1 使用docker安装zk
$ docker search zookeeper # 查看一下镜像

$ docker pull zookeeper:3.4.9  # 拉取指定版本zk镜像

$ docker images  # 查看image ID

$ mkdir -p /docker/zk/data
$ docker run -d -p 2181:2181 -v /docker/zk/data:/data/ --name zookeeper --privileged 3b83d9104a4c # 最后跟着 image ID

1.2 安装kafka 1.2.1 上传压缩包到centos 1.2.2 解压压缩文件
$ tar -zxvf kafka_2.11-2.4.1.tgz
1.2.3 修改配置文件

修改配置文件:/kafka2.11-2.4/config/server.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.3.47:9092 
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.3.47:2181

搭建kafak集群,主要修改broker.id的值,和修改 端口号即可

1.2.3 启动kafka
$ cd bin # 进入到bin目录
$ ./kafka-server-start.sh -daemon ../config/server.properties

使用zk链接工具查看broker信息

1.3 测试发送和接收消息 1.3.1 发送消息

kafka-console-producer.sh这个命令在bin目录下

./kafka-console-producer.sh --broker-list 192.168.3.47:9092 --topic testtopic
1.3.2 接收消息
  • 从最后一条消息的偏移量+1开始消费
$ ./kafka-console-consumer.sh --bootstrap-server 192.168.3.47:9092 --topic testtopic
  • 从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.3.47:9092 --from-beginning --topic testtopic
1.4 kafka-eagle 监控平台

kafka-eagle-bin-1.2.4.tar.gz

1.4.1 解压文件
$ tar -zxvf kafka-eagle-bin-1.2.4.tar.gz # 解压后得到下面的web压缩包,需要再进行解压
$ tar -zxvf kafka-eagle-web-1.2.4-bin.tar.gz
1.4.2 配置eagle环境变量

依赖jdk,需要先安装jdk

$ vim  /etc/profile

export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4
export PATH=$PATH:$KE_HOME/bin:$JAVA_HOME/bin

$ source /etc/profile

1.4.3 修改system-config.properties

/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4/conf/system-config.properties

######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
#cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
cluster1.zk.list=192.168.3.47:2181

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
cluster2.kafka.eagle.offset.storage=zk

######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=true

######################################
# alarm email configure
######################################
kafka.eagle.mail.enable=true
kafka.eagle.mail.sa=alert_sa
kafka.eagle.mail.username=alert_sa@163.com
kafka.eagle.mail.password=mqslimczkdqabbbh
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN

######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.3.47:3306/kafka_eagle?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&allowMultiQueries=true&autoReconnect=true
kafka.eagle.username=root
kafka.eagle.password=root


1.4.4 启动kafka-eagle

/usr/local/kafka-eagle/kafka-eagle-bin-1.2.4/kafka-eagle-web-1.2.4/bin

$ chmod 777  ke.sh
$ ./ke.sh start

访问: 192.168.3.47:8048/ke

默认账号密码: admin/123456

二、kafka基础概念 2.1 Broker

处理消息的节点,是一个逻辑概念,可以当成kafka服务器。

2.2 Topic

topic可以实现消息的分类,不同消费者订阅不同的topic;

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被

订阅该topic的消费者消费

2.3 Pratition

pratition分区。

⼀个主题中的消息量是⾮常⼤的,因此可以通过分区的设置,来分布式存储这些消息

分区可以解决问题:

  • 分区存储,可以解决统⼀存储⽂件过⼤的问题

  • 提供了读写的吞吐量:读和写可以同时在多个分区中进⾏

每个区里面有事分段存储。segment

2.4 offset

偏移量(消息存放的位置)

2.4 kafka中消息⽇志⽂件中保存的内容
  • 在server.properti中配置的日志保存路径中 00000.log: 这个⽂件中保存的就是消息

  • __consumer_offsets-49:

    kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区。这个主题⽤来存放消费

    者消费某个主题的偏移量。因为每个消费者都会⾃⼰维护着消费的主题的偏移量,也就是

    说每个消费者会把消费的主题的偏移量⾃主上报给kafka中的默认主题:

    consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。

    • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets

      主题的分区数

    • 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前

      offset的值

  • ⽂件中保存的消息,默认保存7天。七天到后消息会被删除。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/678628.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号