- 服务器提前准备
- 1.准备文件
- 2.安装jdk(三台机)
- 3.安装zookeeper集群
- 3.1解压与生成配置文件
- 3.2 修改zookeeper中admin server的8080端口或停用内嵌的管理控制台(可选)
- 3.4 zookeeper集群启动
- 4.kafka集群安装
- 4.1解压并修改配置文件
- 4.1.1解压并配置192.168.56.101
- 4.1.2解压并配置192.168.56.102
- 4.1.2解压并配置192.168.56.103
- 4.2 kafka集群启动停止命令
- 5.kafka集群使用命令
此文档为Kafka集群环境搭建文档,如果只是需要单机kafka版本, 请点此查看
服务器提前准备| Ip | Hostname |
|---|---|
| 192.168.56.101 | kafkaserver1 |
| 192.168.56.102 | kafkaserver2 |
| 192.168.56.103 | kafkaserver3 |
注:此文档建立在防火墙停止或已开放端口的前提下,要保证防火墙已开放相应的通讯端口2181及9092
##Centos下停止防火墙命令 systemctl stop firewalld ##开放端口2181及9092命令: firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload1.准备文件
jdk-8u301-linux-x64.tar.gz 或其他Jdk8版本 —> JDK
kafka_2.12-2.4.1.tgz —> kafka安装包
apache-zookeeper-3.5.7-bin.tar.gz —> zookeeper安装包
jdk-8u301-linux-x64.tar.gz从oracle官网下载 https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
Kafka:
https://archive.apache.org/dist/kafka/2.4.1/kafka_2.12-2.4.1.tgz
Zookeeper:
http://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar -xvf jdk-8u301-linux-x64.tar.gz -C /usr/local cat >> /etc/profile << EOF export JAVA_HOME=/usr/local/jdk1.8.0_301/ export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar EOF source /etc/profile3.安装zookeeper集群 3.1解压与生成配置文件
先在三台机器上执行以下操作,内容一致
#解压到目录/opt(你可以选择其他),并建立data目录 tar -xvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/ mkdir /opt/apache-zookeeper-3.5.7-bin/data #配置文件生成 cat > /opt/apache-zookeeper-3.5.7-bin/conf/zoo.cfg << EOF tickTime=2000 dataDir=/opt/apache-zookeeper-3.5.7-bin/data dataLogDir=/opt/apache-zookeeper-3.5.7-bin/logs clientPort=2181 initLimit=10 syncLimit=5 server.1=192.168.56.101:2888:3888 server.2=192.168.56.102:2888:3888 server.3=192.168.56.103:2888:3888 admin.serverPort=8080 EOF
然后分别在三台机上根据按顺序分配相应的myid,如下
####在192.168.56.101上执行 echo 1 > /opt/apache-zookeeper-3.5.7-bin/data/myid ####在192.168.56.102上执行 echo 2 > /opt/apache-zookeeper-3.5.7-bin/data/myid ####在192.168.56.103上执行 echo 3 > /opt/apache-zookeeper-3.5.7-bin/data/myid3.2 修改zookeeper中admin server的8080端口或停用内嵌的管理控制台(可选)
zookeeper3.5.7版本中有个内嵌的管理控制台是通过jetty启动,会占用8080 端口,如上配置在zoo.cfg中的admin.serverPort,可以通过修改此配置来更换端口。
或者干脆直接停用这个服务通过在启动脚本中增加"-Dzookeeper.admin.enableServer=false",我是直接停止这个服务的,方法如下:
vi /opt/apache-zookeeper-3.5.7-bin/bin/zkServer.sh 找到-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &这一行,大概在161行 在$JVMFLAGS后面增加"-Dzookeeper.admin.enableServer=false",如图:
注:建议三台机上都修改,保存不要遗漏,同步配置
3.4 zookeeper集群启动启动停止查看状态,在三台机上分别执行以下命令进行集群的状态维护,比如启动集群,在三台机上同时执行/opt/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start:
/opt/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start /opt/apache-zookeeper-3.5.7-bin/bin/zkServer.sh stop /opt/apache-zookeeper-3.5.7-bin/bin/zkServer.sh status4.kafka集群安装 4.1解压并修改配置文件
集群基本上使用默认配置就好,我这里是进行了如brokerid、num.partitions、offsets.topic.replication.factor、zookeeper.connect、zookeeper.connection.timeout.ms的修改。
brokerid,当前节点的id号,唯一标识,建议给顺序数字,方便管理
num.partitions,控制设定几个分区,default.replication.factor,控制设定几个备份。我有三个节点,所以各给了3,也可根据自身实际情况以及自身需求给定
zookeeper.connect指定外部zk源的各个节点。若无特殊指定,外部zk集群默认端口2181
zookeeper.connection.timeout.ms根据自身网络情况设定,通常默认就好
default.replication.factor:副本个数,默认为1,如果需要保持高可用,可以设置成3
以下参数也建议设置修改,默认是1,没有高可用,
default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3
具体三台配置过程详情如下:
4.1.1解压并配置192.168.56.101##解压 tar -xvf kafka_2.12-2.4.1.tgz -C /opt/ ####修改/opt/kafka_2.12-2.4.1/config/server.properties broker.id=1 num.partitions=3 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 isteners=PLAINTEXT://192.168.56.101:9092 advertised.listeners=PLAINTEXT://192.168.56.101:9092 log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs log.retention.hours=24 zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 zookeeper.connection.timeout.ms=60000 delete.topic.enable=true auto.create.topics.enable=false
附上直接执行的linux命令,方便server.properties的操作修改
sed -i 's/^broker.id=0/broker.id=1/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^num.partitions=1/num.partitions=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^offsets.topic.replication.factor=1/offsets.topic.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.replication.factor=1/transaction.state.log.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.min.isr=1/transaction.state.log.min.isr=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#listeners=PLAINTEXT://:9092/listeners=PLAINTEXT://192.168.56.101:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#advertised.listeners=PLAINTEXT://your.host.name:9092/advertised.listeners=PLAINTEXT://192.168.56.101:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.dirs=/tmp/kafka-logs/log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.retention.hours=168/log.retention.hours=24/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connect=localhost:2181/zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=60000/' /opt/kafka_2.12-2.4.1/config/server.properties cat >> /opt/kafka_2.12-2.4.1/config/server.properties << EOF default.replication.factor=3 delete.topic.enable=true auto.create.topics.enable=false EOF4.1.2解压并配置192.168.56.102
##解压 tar -xvf kafka_2.12-2.4.1.tgz -C /opt/ ####修改/opt/kafka_2.12-2.4.1/config/server.properties broker.id=2 num.partitions=3 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 isteners=PLAINTEXT://192.168.56.102:9092 advertised.listeners=PLAINTEXT://192.168.56.102:9092 log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs log.retention.hours=24 zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 zookeeper.connection.timeout.ms=60000 delete.topic.enable=true auto.create.topics.enable=false
附上直接执行的linux命令,方便server.properties的操作修改
sed -i 's/^broker.id=0/broker.id=2/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^num.partitions=1/num.partitions=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^offsets.topic.replication.factor=1/offsets.topic.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.replication.factor=1/transaction.state.log.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.min.isr=1/transaction.state.log.min.isr=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#listeners=PLAINTEXT://:9092/listeners=PLAINTEXT://192.168.56.102:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#advertised.listeners=PLAINTEXT://your.host.name:9092/advertised.listeners=PLAINTEXT://192.168.56.102:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.dirs=/tmp/kafka-logs/log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.retention.hours=168/log.retention.hours=24/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connect=localhost:2181/zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=60000/' /opt/kafka_2.12-2.4.1/config/server.properties cat >> /opt/kafka_2.12-2.4.1/config/server.properties << EOF default.replication.factor=3 delete.topic.enable=true auto.create.topics.enable=false EOF4.1.2解压并配置192.168.56.103
##解压 tar -xvf kafka_2.12-2.4.1.tgz -C /opt/ ####修改/opt/kafka_2.12-2.4.1/config/server.properties broker.id=3 num.partitions=3 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 isteners=PLAINTEXT://192.168.56.102:9092 advertised.listeners=PLAINTEXT://192.168.56.102:9092 log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs log.retention.hours=24 zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 zookeeper.connection.timeout.ms=60000 delete.topic.enable=true auto.create.topics.enable=false
附上直接执行的linux命令,方便server.properties的操作修改
sed -i 's/^broker.id=0/broker.id=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^num.partitions=1/num.partitions=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^offsets.topic.replication.factor=1/offsets.topic.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.replication.factor=1/transaction.state.log.replication.factor=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^transaction.state.log.min.isr=1/transaction.state.log.min.isr=3/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#listeners=PLAINTEXT://:9092/listeners=PLAINTEXT://192.168.56.103:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^#advertised.listeners=PLAINTEXT://your.host.name:9092/advertised.listeners=PLAINTEXT://192.168.56.103:9092/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.dirs=/tmp/kafka-logs/log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^log.retention.hours=168/log.retention.hours=24/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connect=localhost:2181/zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181/' /opt/kafka_2.12-2.4.1/config/server.properties sed -i 's/^zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=60000/' /opt/kafka_2.12-2.4.1/config/server.properties cat >> /opt/kafka_2.12-2.4.1/config/server.properties << EOF default.replication.factor=3 delete.topic.enable=true auto.create.topics.enable=false EOF4.2 kafka集群启动停止命令
在三台集群机器上都执行对应的启动命令即可
# 后台启动kafka,在三台集群机器上都执行 cd /opt/kafka_2.12-2.4.1 ./bin/kafka-server-start.sh -daemon config/server.properties # 如果要停止集群,在三台集群机器上都执行: ./bin/kafka-server-stop.sh5.kafka集群使用命令
先cd到kafka的bin目录 cd /opt/kafka_2.12-2.4.1/bin ##显示topic列表 ./kafka-topics.sh --list --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 ##创建topic ./kafka-topics.sh --create --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest ##删除topic,建议使用--bootstrap-server参数的命令 ./kafka-topics.sh --delete --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest ##查看topic的描述信息 ./kafka-topics.sh --describe --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest ##启动命令行生产者进行测试 ./kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest ##启动命令行消费者进行测试 ./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --from-beginning --topic topictest



