一、kafka和zk的安装二、修改配置
修改kafka配置
创建kafka消息存放目录第一台机器上server.properties的配置:第二台机器上server.properties的配置:第三台机器上server.properties的配置: 三、启动服务
启动zookeeper服务分别启动3个机器上的kafka 四、集群下创建Topic
创建一个名为myschooling的topic,设置3个副本、30个分区查看已经创建的 topics 五、集群验证
集群下启动一个Producer集群下启动一个Consumer
通过zookeeper进一步验证
六、kafka相关配置信息及命令
*说明:本文介绍使用外部zk集群搭建kafka集群。
kafka单机版安装可参考:CentOS安装kafka-2.8.0单机版
kafka单机版伪集群可参考:CentOS7安装Kafka 2.8.0单机集群(伪集群)
zk集群安装可参考:linux下安装zookeeper集群-3.6.3版本
二、修改配置 修改kafka配置
修改配置文件中的broker.id分别为1、2、3;listeners这一行取消注释,端口号分别为9092、9092、9092;log.dirs分别设置为kafkadata/kafkadata0~kafkadata/kafkadata9(先创建文件夹) 创建kafka消息存放目录
cd /usr/local/kafka mkdir -p kafkadata/kafkadata0 kafkadata/kafkadata1 kafkadata/kafkadata2 kafkadata/kafkadata3 kafkadata/kafkadata4 kafkadata/kafkadata5 kafkadata/kafkadata6 kafkadata/kafkadata7 kafkadata/kafkadata8 kafkadata/kafkadata9第一台机器上server.properties的配置:
broker.id=1 listeners = PLAINTEXT://10.230.14.224:9092 log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9 zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181第二台机器上server.properties的配置:
broker.id=2 listeners = PLAINTEXT://10.230.14.146:9092 log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9 zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181第三台机器上server.properties的配置:
broker.id=3 listeners = PLAINTEXT://10.230.14.66:9092 log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9 zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181
总得配置(各配置项说明):
############################# Server Basics ############################# #The id of the broker. This must be set to a unique integer for each broker. broker.id=1 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://10.230.14.224:9092 #Hostname and port the broker will advertise to producers and consumers. If not set, #it uses the value for "listeners" if configured. Otherwise, it will use the value #returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 #The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 #The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 #The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 #The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 #The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# #A comma separated list of directories under which to store log files log.dirs=/usr/local/kafka/kafkadata/kafkadata0,/usr/local/kafka/kafkadata/kafkadata1,/usr/local/kafka/kafkadata/kafkadata2,/usr/local/kafka/kafkadata/kafkadata3,/usr/local/kafka/kafkadata/kafkadata4,/usr/local/kafka/kafkadata/kafkadata5,/usr/local/kafka/kafkadata/kafkadata6,/usr/local/kafka/kafkadata/kafkadata7,/usr/local/kafka/kafkadata/kafkadata8,/usr/local/kafka/kafkadata/kafkadata9 #The default number of log partitions per topic. More partitions allow greater #parallelism for consumption, but this will also result in more files across the brokers. num.partitions=30 #The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. #This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=3 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# #The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# #The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 #A size-based retention policy for logs. Segments are pruned from the log unless the remaining #segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 #The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 #The interval at which log segments are checked to see if they can be deleted according #to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# #Zookeeper connection #zookeeper.connect=localhost:2181 zookeeper.connect=10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181 #Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# #The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. #The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. #The default value for this is 3 seconds. #We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. #However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=3 default.replication.factor=3 replica.fetch.max.bytes=2048000 num.replica.fetchers=4 auto.create.topics.enable=false replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.lag.max.messages=4000 replica.lag.time.max.ms=10000 delete.topic.enable=true
kafka服务器配置文件说明
# ----------------------系统相关---------------------- # broker的全局唯一编号,不能重复,和zookeeper的myid是一个意思 broker.id=0 # broker监听IP和端口也可以是域名 listeners=PLAINTEXT://172.17.56.175:9092 # 用于接收请求的线程数量 num.network.threads=3 # 用于处理请求的线程数量,包括磁盘IO请求,这个数量和log.dirs配置的目录数量有关,这里的数量不能小于log.dirs的数量, # 虽然log.dirs是配置日志存放路径,但是它可以配置多个目录后面用逗号分隔 num.io.threads=8 # 发送缓冲区大小,也就是说发送消息先发送到缓冲区,当缓冲区满了之后一起发送出去 socket.send.buffer.bytes=102400 # 接收缓冲区大小,同理接收到缓冲区,当到达这个数量时就同步到磁盘 socket.receive.buffer.bytes=102400 # 向kafka套接字请求最大字节数量,防止服务器OOM,也就是OutOfMemery,这个数量不要超过JAVA的堆栈大小, socket.request.max.bytes=104857600 # 日志路径也就是分区日志存放的地方,你所建立的topic的分区就在这里面,但是它可以配置多个目录后面用逗号分隔 log.dirs=/tmp/kafka-logs # 消息体(也就是往Kafka发送的单条消息)最大大小,单位是字节,必须小于socket.request.max.bytes值 message.max.bytes =5000000 # 自动平衡由于某个broker故障会导致Leader副本迁移到别的broker,当之前的broker恢复后也不会迁移回来,有时候我们需要 # 手动进行平衡避免同一个主题不同分区的Leader副本在同一台broker上,下面这个参数就是开启自动平衡功能 auto.leader.rebalance.enable=true #设置了上面的自动平衡,当故障转移后,隔300秒(默认)触发一个定时任务进行平衡操作,而只有代理的不均衡率为10%以上才会执行 leader.imbalance.check.interval.seconds=300 # 设置代理的不均衡率,默认是10% leader.imbalance.per.broker.percentage=10 # ---------------分区相关------------------------- # 默认分区数量,当建立Topic时不指定分区数量,默认就1 num.partitions=1 # 是否允许自动创建topic ,若是false,就需要通过命令创建topic auto.create.topics.enable =true # 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数 default.replication.factor =2 # ---------------日志相关------------------------- # segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。 # 这里就是用来设置恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # 日志文件中每个segment的大小,默认为1G。topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,当超过这个大小会建立一个新日志文件 # 这个参数会被topic创建时的指定参数覆盖,如果你创建Topic的时候指定了这个参数,那么你以你指定的为准。 log.segment.bytes=1073741824 # 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据 # log.retention.bytes和log.retention.minutes|hours任意一个达到要求,都会执行删除 # 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准 log.retention.hours|minutes=168 # 这个参数会在日志segment没有达到log.segment.bytes设置的大小默认1G的时候,也会强制新建一个segment会被 # topic创建时的指定参数覆盖 log.roll.hours=168 # 上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次, # 就需要设置一个周期性检查文件大小的时间(单位是毫秒)。 log.retention.check.interval.ms=300000 # 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度, # 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准 log.cleanup.policy = delete # 是否启用日志清理功能,默认是启用的且清理策略为compact,也就是压缩。 log.cleaner.enable=false # 日志清理时所使用的缓存空间大小 log.cleaner.dedupe.buffer.size=134217728 # log文件"sync"到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段 # 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡. # 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞) # 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟. # 物理server故障,将会导致没有fsync的消息丢失. log.flush.interval.messages=9223372036854775807 # 检查是否需要固化到硬盘的时间间隔 log.flush.scheduler.interval.ms =3000 # 仅仅通过interval来控制消息的磁盘写入时机,是不足的. # 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔 # 达到阀值,也将触发. log.flush.interval.ms=None # -----------复制(Leader、replicas) 相关------------------- # partition leader与replicas之间通讯时,socket的超时时间 controller.socket.timeout.ms=30000 # replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas), # 并认为它是死的,不会再加入管理中 replica.lag.time.max.ms=10000 # follower与leader之间的socket超时时间 replica.socket.timeout.ms=300000 # leader复制时候的socket缓存大小 replica.socket.receive.buffer.bytes=65536 # replicas每次获取数据的最大大小 replica.fetch.max.bytes=1048576 # replicas同leader之间通信的最大等待时间,失败了会重试 replica.fetch.wait.max.ms=500 # fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 replica.fetch.min.bytes=1 # leader 进行复制的线程数,增大这个数值会增加follower的IO num.replica.fetchers=1 # 最小副本数量 min.insync.replicas=2三、启动服务 启动zookeeper服务
分别启动3个机器上的kafkakafka需要依赖ZK,安装包中已经自带了一个ZK,也可以改成指定已运行的ZK。如果改成指定的ZK需要修改 kafka 安装目录下的config/server.properties 文件中的 zookeeper.connect 。这里我们使用外部zk。zk集群的安装上面有参考链接,安装好集群之后并启动。
cd /usr/local/kafka/bin nohup ./kafka-server-start.sh ../config/server.properties &四、集群下创建Topic 创建一个名为myschooling的topic,设置3个副本、30个分区
cd /usr/local/kafka/bin ./kafka-topics.sh --create --zookeeper 10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181 --replication-factor 3 --partitions 30 --topic myschooling查看已经创建的 topics
cd /usr/local/kafka/bin ./kafka-topics.sh -list -zookeeper 10.230.14.224:2181,10.230.14.146:2181,10.230.14.66:2181
在10.230.14.224远程窗口中:
cd /usr/local/kafka/bin ./kafka-console-producer.sh --broker-list 10.230.14.224:9092,10.230.14.146:9092,10.230.14.66:9092 --topic myschooling集群下启动一个Consumer
在10.230.14.146远程窗口中:
cd /usr/local/kafka/bin ./kafka-console-consumer.sh --bootstrap-server 10.230.14.224:9092,10.230.14.146:9092,10.230.14.66:9092 --topic myschooling
在Producer窗口中输入“kafka helloword”,如果在Consumer窗口中接收到相关消息则表明集群搭建成功。
登录10.230.14.66机器的zk客户端:
cd /usr/local/zookeeper/bin/ ./zkCli.sh -server 10.230.14.66:2181六、kafka相关配置信息及命令
kafka相关配置信息及命令可以查看kafka官方文档。
注意:如果在多台机器上搭建集群出现无法连接的状况请检查防火墙,或者直接将防火墙关闭。



