栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka集群搭建

kafka集群搭建

前置条件

1、kafka集群的安装配置依赖zookeeper,搭建kafka集群之前,需要搭建好zookeeper集群

2、需要有jdk环境

搭建kafka集群

解压安装包

tar -zxf kafka_2.12-2.7.0.tgz -C /opt

创建日志目录

mkdir /opt/kafka_2.12-2.7.0/logs

修改配置

vi /opt/kafka_2.12-2.7.0/config/server.properties
broker.id=1                                         #broker的全局唯一编号不能重复,建议与zookeeper的myid对应
listeners=PLAINTEXT://$IP:9092            			#broker监听ip和端口
num.network.threads=3                               #borker进行网络处理的线程数
num.io.threads=8                                    #borker进行I/O处理的线程数
socket.send.buffer.bytes=102400                     #发送缓冲区大小,即发送消息先发送到缓冲区,当缓冲区满了在一起发出去
socket.receive.buffer.bytes=102400                  #接收缓冲区大小,接收消息先放到接收缓冲区,当达到这个数量时同步到磁盘
socket.request.max.bytes=104857600                  #向kafka套接字请求的最大字节数量,防止服务器outofmemory,大小最好不要超过java的堆栈大小
log.dirs=/opt/kafka_2.12-2.7.0/logs      			#消息存放目录,不是日志目录
num.partitions=1                                    #每个topic的默认分区数
num.recovery.threads.per.data.dir=1                 #处理消息目录的线程数,若设置了3个消息路径,改参数为2,那么一共需要6个线程
offsets.topic.replication.factor=1                 
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168                             #消息过期时间,默认为1周
log.segment.bytes=1073741824                        #日志文件中每个segment的大小,默认为1G,topic的分区是以一堆segment文件存储的,超过此限制会建立一个新的日志文件。此参数若在创建topic时的指定,那么参数覆盖,以指定的为准
log.retention.check.interval.ms=300000              #如上设置了每个segment文件大小为1G,那么此时间间隔就是检查他的大小有没有达到1G,检查的时间间隔
zookeeper.connect=192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

配置文件解释:

log.dirs:Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

num.recovery.threads.per.data.dir:

对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:
  • 服务器正常启动,用于打开每个分区的日志片段;
  • 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
  • 服务器正常关闭,用于关闭日志片段

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的

服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

测试检验

bin目录下启动kafka

kafka-server-start.sh -daemon./config/server.properties

使用jps可以看到kafka进程

zookeeper+kafka集群消息发布

创建一个topic,3个分区,3个副本

./bin/kafka-topics.sh --create --zookeeeper 192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181 -replication-factor 3 --partitions 3 --topic test

查看topic列表

 ./bin/kafka-topics.sh --list --zookeeper 192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181

用一台服务器模拟生产者,发布消息

 ./bin/kafka-console-producer.sh --broker-list 192.168.18.237:9092 --topic test

用另一台服务器模拟消费者,消费消息

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic test --from-beginning

注:–from-beginning 表示从头接收,否则只能从当前的offset接收新消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618424.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号