- 本着傻瓜式配置,可重复搭建kafka的指导思想编写本文档
- 快速满足安全加固要求
Static hostname: localhost.localdomain
Icon name: computer-vm
Chassis: vm
Machine ID: c744f9429d54f945b3b38d3eb7f3a591
Boot ID: 3fc16fddef1543deb3c57a6bac71a670
Virtualization: kvm
Operating System: CentOS Linux 7 (Core)
CPE OS Name: cpe:/o:centos:centos:7
Kernel: Linux 3.10.0-1160.49.1.el7.x86_64
Architecture: x86-64
版本信息
- kafka_2.11-2.1.0
- zookeeper-3.4.13
# 上传到/home/kafka目录 scp kafka_2.11-2.1.0.tgz root@192.168.2.21:/home/kafka scp zookeeper-3.4.13.tar.gz root@192.168.2.21:/home/kafka cd /home tar zxvf kafka_2.11-2.1.0.tgz tar zxvf zookeeper-3.4.13.tar.gz2. 添加SASL/PLAIN配置信息
为了满足测试,需要修改的文件如下
/home/kafka/kafka_2.11-2.1.0/config. ├── consumer.properties ├── kafka_server_jaas.conf ├── producer.properties ├── server.properties ├── zk_server_jaas.conf └── zookeeper.propertieskafka_server_jaas.conf
vi kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
producer.properties末尾追加如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass";
server.properties
# 注释 #listeners=PLAINTEXT://0.0.0.0:9092
listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=false # 参数 allow.everyone.if.no.acl.found # 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问 # 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false auto.create.topics.enable=false super.users=User:admin zookeeper.set.acl=truezk_server_jaas.conf
vi zk_server_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
zookeeper.properties末尾追加如下配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000/home/kafka/kafka_2.11-2.1.0/bin
. ├── kafka-console-consumer.sh ├── kafka-console-producer.sh ├── kafka-server-start.sh ├── kafka-topics.sh ├── zookeeper-security-migration.sh ├── zookeeper-server-start.shkafka-console-consumer.sh、kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
fi
kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
fi
kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf" exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"zookeeper-security-migration.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf" exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@"zookeeper-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M " # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/zk_server_jaas.conf"
fi
3. 启动服务
强烈建议准备全新安装之后的操作系统环境,避免重复验证时出现干扰因素,浪费时间排查问题
cd /home/kafka/kafka_2.11-2.1.0/ # 新开终端,执行如下命令 ./bin/kafka-server-start.sh config/server.properties # 新开终端,执行如下命令 ./bin/zookeeper-server-start.sh config/zookeeper.properties # 新开终端,执行如下命令,创建成功后,再启动生产者、消费者 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 新开终端,执行如下命令 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --producer.config=cig/producer.properties --topic test # 新开终端,执行如下命令 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/consumer.properties --topic test --from-beginning参考文档
-
写的太好了=》给 Kafka 配置 SASL/PLAIN 认证 | Kyle’s Blog
本人找到这篇文章前,确实也遇到了相同的问题
定义用户的方式非常奇葩,很难理解。username 和 password 两字段定义 Kafka brokers 内部沟通的用户密码,user_用户名 配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。此外可以再添加新用户,如添加 user_alice=”alice-secret” 。 -
10.0 Zookeeper 权限控制 ACL | 菜鸟教程 (runoob.com)
-
Apache Kafka
-
Welcome — ZooNavigator Docs (elkozmon.com)
docker run --rm -e HTTP_PORT=9000 --name zoonavigator -p 9000:9000 elkozmon/zoonavigator:latest



