栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Centos7 安装Kafka

Centos7 安装Kafka

文章目录

一、kafka和zk的安装二、修改配置

修改kafka配置

创建kafka消息存放目录第一台机器上server.properties的配置:第二台机器上server.properties的配置:第三台机器上server.properties的配置: 三、启动服务

启动zookeeper服务分别启动3个机器上的kafka 四、集群下创建Topic

创建一个名为myschooling的topic,设置3个副本、30个分区查看已经创建的 topics 五、集群验证

集群下启动一个Producer集群下启动一个Consumer

通过zookeeper进一步验证 六、kafka相关配置信息及命令
*说明:本文介绍使用外部zk集群搭建kafka集群。

一、kafka和zk的安装

kafka单机版安装可参考:CentOS安装kafka-2.8.0单机版

kafka单机版伪集群可参考:CentOS7安装Kafka 2.8.0单机集群(伪集群)

zk集群安装可参考:linux下安装zookeeper集群-3.6.3版本


二、修改配置 修改kafka配置

修改配置文件中的broker.id分别为1、2、3;listeners这一行取消注释,端口号分别为9092、9092、9092;log.dirs分别设置为kafkadata/kafkadata0~kafkadata/kafkadata9(先创建文件夹) 创建kafka消息存放目录

cd /usr/local/kafka
mkdir -p kafkadata/kafkadata0 kafkadata/kafkadata1 kafkadata/kafkadata2 kafkadata/kafkadata3 kafkadata/kafkadata4 kafkadata/kafkadata5 kafkadata/kafkadata6 kafkadata/kafkadata7 kafkadata/kafkadata8 kafkadata/kafkadata9
第一台机器上server.properties的配置:
broker.id=1
listeners = PLAINTEXT://10.230.14.224:9092
log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9
zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181
第二台机器上server.properties的配置:
broker.id=2
listeners = PLAINTEXT://10.230.14.146:9092
log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9
zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181
第三台机器上server.properties的配置:
broker.id=3
listeners = PLAINTEXT://10.230.14.66:9092
log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9
zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181

总得配置(各配置项说明):

############################# Server Basics #############################
#The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://10.230.14.224:9092
#Hostname and port the broker will advertise to producers and consumers. If not set, 
#it uses the value for "listeners" if configured.  Otherwise, it will use the value
#returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
#The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
#The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
#The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
#A comma separated list of directories under which to store log files
log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9
#The default number of log partitions per topic. More partitions allow greater
#parallelism for consumption, but this will also result in more files across the brokers.
num.partitions=30

#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
#This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
#The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
#A size-based retention policy for logs. Segments are pruned from the log unless the remaining
#segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
#The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
#The interval at which log segments are checked to see if they can be deleted according
#to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#Zookeeper connection
#zookeeper.connect=localhost:2181
zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181
#Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
#The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
#The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
#The default value for this is 3 seconds.
#We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
#However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=3
default.replication.factor=3
replica.fetch.max.bytes=2048000
num.replica.fetchers=4
auto.create.topics.enable=false
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.max.messages=4000  
replica.lag.time.max.ms=10000
delete.topic.enable=true

kafka服务器配置文件说明

# ----------------------系统相关----------------------
# broker的全局唯一编号,不能重复,和zookeeper的myid是一个意思
broker.id=0
 
# broker监听IP和端口也可以是域名
listeners=PLAINTEXT://172.17.56.175:9092
 
# 用于接收请求的线程数量
num.network.threads=3
 
# 用于处理请求的线程数量,包括磁盘IO请求,这个数量和log.dirs配置的目录数量有关,这里的数量不能小于log.dirs的数量,
# 虽然log.dirs是配置日志存放路径,但是它可以配置多个目录后面用逗号分隔
num.io.threads=8
 
# 发送缓冲区大小,也就是说发送消息先发送到缓冲区,当缓冲区满了之后一起发送出去
socket.send.buffer.bytes=102400
 
# 接收缓冲区大小,同理接收到缓冲区,当到达这个数量时就同步到磁盘
socket.receive.buffer.bytes=102400
 
# 向kafka套接字请求最大字节数量,防止服务器OOM,也就是OutOfMemery,这个数量不要超过JAVA的堆栈大小,
socket.request.max.bytes=104857600
 
# 日志路径也就是分区日志存放的地方,你所建立的topic的分区就在这里面,但是它可以配置多个目录后面用逗号分隔
log.dirs=/tmp/kafka-logs
 
# 消息体(也就是往Kafka发送的单条消息)最大大小,单位是字节,必须小于socket.request.max.bytes值
message.max.bytes =5000000
 
# 自动平衡由于某个broker故障会导致Leader副本迁移到别的broker,当之前的broker恢复后也不会迁移回来,有时候我们需要
# 手动进行平衡避免同一个主题不同分区的Leader副本在同一台broker上,下面这个参数就是开启自动平衡功能
auto.leader.rebalance.enable=true
 
#设置了上面的自动平衡,当故障转移后,隔300秒(默认)触发一个定时任务进行平衡操作,而只有代理的不均衡率为10%以上才会执行
leader.imbalance.check.interval.seconds=300
 
# 设置代理的不均衡率,默认是10%
leader.imbalance.per.broker.percentage=10
 
# ---------------分区相关-------------------------
 
# 默认分区数量,当建立Topic时不指定分区数量,默认就1
num.partitions=1
 
# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
 
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =2
 
# ---------------日志相关-------------------------
 
# segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。
# 这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
 
# 日志文件中每个segment的大小,默认为1G。topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,当超过这个大小会建立一个新日志文件
# 这个参数会被topic创建时的指定参数覆盖,如果你创建Topic的时候指定了这个参数,那么你以你指定的为准。
log.segment.bytes=1073741824
 
# 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
# log.retention.bytes和log.retention.minutes|hours任意一个达到要求,都会执行删除
# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准
log.retention.hours|minutes=168
 
# 这个参数会在日志segment没有达到log.segment.bytes设置的大小默认1G的时候,也会强制新建一个segment会被
# topic创建时的指定参数覆盖
log.roll.hours=168 
 
# 上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次,
# 就需要设置一个周期性检查文件大小的时间(单位是毫秒)。
log.retention.check.interval.ms=300000
 
# 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,
# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准
log.cleanup.policy = delete
 
# 是否启用日志清理功能,默认是启用的且清理策略为compact,也就是压缩。
log.cleaner.enable=false
 
# 日志清理时所使用的缓存空间大小
log.cleaner.dedupe.buffer.size=134217728
 
# log文件"sync"到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
# 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
# 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
# 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
# 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=9223372036854775807
 
# 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
 
# 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
# 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
# 达到阀值,也将触发.
log.flush.interval.ms=None
 
# -----------复制(Leader、replicas) 相关-------------------
# partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms=30000
# replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),
# 并认为它是死的,不会再加入管理中
replica.lag.time.max.ms=10000
# follower与leader之间的socket超时时间
replica.socket.timeout.ms=300000
# leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=65536
# replicas每次获取数据的最大大小
replica.fetch.max.bytes=1048576
# replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms=500
# fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes=1
# leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1
# 最小副本数量
min.insync.replicas=2
三、启动服务 启动zookeeper服务

kafka需要依赖ZK,安装包中已经自带了一个ZK,也可以改成指定已运行的ZK。如果改成指定的ZK需要修改 kafka 安装目录下的config/server.properties 文件中的 zookeeper.connect 。这里我们使用外部zk。zk集群的安装上面有参考链接,安装好集群之后并启动

分别启动3个机器上的kafka
cd /usr/local/kafka/bin
nohup ./kafka-server-start.sh ../config/server.properties &
四、集群下创建Topic 创建一个名为myschooling的topic,设置3个副本、30个分区
cd /usr/local/kafka/bin
./kafka-topics.sh --create --zookeeper 10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181 --replication-factor 3 --partitions 30 --topic myschooling

查看已经创建的 topics
cd /usr/local/kafka/bin
./kafka-topics.sh -list -zookeeper 10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181



五、集群验证 集群下启动一个Producer

在10.230.14.224远程窗口中:

cd /usr/local/kafka/bin
./kafka-console-producer.sh --broker-list 10.230.14.224:9092,10.230.14.146:9092,10.230.14.66:9092 --topic myschooling
集群下启动一个Consumer

在10.230.14.146远程窗口中:

cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 10.230.14.224:9092,10.230.14.146:9092,10.230.14.66:9092 --topic myschooling

在Producer窗口中输入“kafka helloword”,如果在Consumer窗口中接收到相关消息则表明集群搭建成功。

通过zookeeper进一步验证

登录10.230.14.66机器的zk客户端:

cd /usr/local/zookeeper/bin/
./zkCli.sh -server 10.230.14.66:2181

六、kafka相关配置信息及命令

kafka相关配置信息及命令可以查看kafka官方文档。


注意:如果在多台机器上搭建集群出现无法连接的状况请检查防火墙,或者直接将防火墙关闭。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775080.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号