1、kafka集群的安装配置依赖zookeeper,搭建kafka集群之前,需要搭建好zookeeper集群
2、需要有jdk环境
搭建kafka集群解压安装包
tar -zxf kafka_2.12-2.7.0.tgz -C /opt
创建日志目录
mkdir /opt/kafka_2.12-2.7.0/logs
修改配置
vi /opt/kafka_2.12-2.7.0/config/server.properties
broker.id=1 #broker的全局唯一编号不能重复,建议与zookeeper的myid对应 listeners=PLAINTEXT://$IP:9092 #broker监听ip和端口 num.network.threads=3 #borker进行网络处理的线程数 num.io.threads=8 #borker进行I/O处理的线程数 socket.send.buffer.bytes=102400 #发送缓冲区大小,即发送消息先发送到缓冲区,当缓冲区满了在一起发出去 socket.receive.buffer.bytes=102400 #接收缓冲区大小,接收消息先放到接收缓冲区,当达到这个数量时同步到磁盘 socket.request.max.bytes=104857600 #向kafka套接字请求的最大字节数量,防止服务器outofmemory,大小最好不要超过java的堆栈大小 log.dirs=/opt/kafka_2.12-2.7.0/logs #消息存放目录,不是日志目录 num.partitions=1 #每个topic的默认分区数 num.recovery.threads.per.data.dir=1 #处理消息目录的线程数,若设置了3个消息路径,改参数为2,那么一共需要6个线程 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #消息过期时间,默认为1周 log.segment.bytes=1073741824 #日志文件中每个segment的大小,默认为1G,topic的分区是以一堆segment文件存储的,超过此限制会建立一个新的日志文件。此参数若在创建topic时的指定,那么参数覆盖,以指定的为准 log.retention.check.interval.ms=300000 #如上设置了每个segment文件大小为1G,那么此时间间隔就是检查他的大小有没有达到1G,检查的时间间隔 zookeeper.connect=192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
配置文件解释:
log.dirs:Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
num.recovery.threads.per.data.dir:
对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:
• 服务器正常启动,用于打开每个分区的日志片段;
• 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
• 服务器正常关闭,用于关闭日志片段
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的
服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。
测试检验bin目录下启动kafka
kafka-server-start.sh -daemon./config/server.properties
使用jps可以看到kafka进程
zookeeper+kafka集群消息发布创建一个topic,3个分区,3个副本
./bin/kafka-topics.sh --create --zookeeeper 192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181 -replication-factor 3 --partitions 3 --topic test
查看topic列表
./bin/kafka-topics.sh --list --zookeeper 192.168.18.237:2181,192.168.18.238:2181,192.168.18.239:2181
用一台服务器模拟生产者,发布消息
./bin/kafka-console-producer.sh --broker-list 192.168.18.237:9092 --topic test
用另一台服务器模拟消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic test --from-beginning
注:–from-beginning 表示从头接收,否则只能从当前的offset接收新消息



