在上一篇文章中,我为大家介绍了PLAIN认证方式,即通过在配置文件里面配置用户名密码的方式提供安全认证。但是PLAIN认证在实际的生产环境中很少被使用,因为用户名密码是在配置文件里面写死的,所以无法动态的新增或删除用户。在实际生产环境中,这是不太能接受的,生产环境不能因为你需要新增或删除用户就重启。那么下面我们为大家介绍的SCRAM认证方式就是非常常用的安全认证方式,它支持动态新增或删除用户。
文章目录- 一、创建超级用户凭证
- 二、服务端配置
- 2.1.修改`server.properties`
- 2.2.配置初始通信用户密码
- 2.3.修改启动脚本
- 三、动态新增删除普通用户
- 3.1.添加zimug用户凭证
- 3.2.查看zimug用户凭证
- 3.3.删除zimug用户凭证
- 四、配置kafka客户端
- 五、apache kafka参数配置
- 六、Spring kafka参数配置
先创建一个admin超级用户凭证,这个用户是用于kafka集群各节点之间,以及controller与broker之间通信认证的用户,以及面向kafka集群管理的的超级用户,不是面向应用及开发者使用的用户。
如果是kafka2.0版本,依赖于zookeeper(保证zookeeper处于运行状态),即可执行下列命令。
> bin/kafka-configs.sh --zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=adsecret],SCRAM-SHA-512=[password=adsecret]' --entity-type users --entity-name admin
如果是kafka3.0版本,需要执行这个命令,只有一个参数不同。
> bin/kafka-configs.sh --bootstrap-server:9092 --alter --add-config 'SCRAM-SHA-256=[password=adsecret],SCRAM-SHA-512=[password=adsecret]' --entity-type users --entity-name admin
使用该命令创建admin用户,密码是adsecret。
二、服务端配置 2.1.修改server.propertiesvim config/server.properties,将listeners的协议权限配置的部分修改为如下内容
listeners=SASL_PLAINTEXT://2.2.配置初始通信用户密码:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
新建一个文件vim config/kafka_server_jaas.conf,添加如下内容:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adsecret";
};
不是说不用配置用户名密码么?这个密码是初始通信用户密码,用于集群broker之间通信用的,不是面向用户的。普通用户的新增和删除是可以动态的,而这个初始通信用户密码需要在配置文件中指定(和上文中创建的admin用户密码必须是一样的)。
2.3.修改启动脚本修改kafka启动脚本vim bin/kafka-server-start.sh,添加如下的启动参数,注意kafka_server_jaas.conf文件路径修改为你自己的安装路径。放在脚本正文(不包括注释)的第一行即可。
export KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/opt/kafka/default/config/kafka_server_jaas.conf"
修改启动脚本之后,使用该脚本启动kafka集群内各个实例。
以上的操作步骤,实际都应该在kafka集群服务搭建的时候完成。以下的操作才是平时运维人员的动态新增删除用户的操作。
三、动态新增删除普通用户 3.1.添加zimug用户凭证> bin/kafka-configs.sh --bootstrap-server:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=xxyyzz345],SCRAM-SHA-512=[password=xxyyzz345]' --entity-type users --entity-name zimug --command-config ./super-userfile
新建一个文件叫做super-userfile,其实这个文件名可以随便,该文件保存的是超级用户的认证信息。因为此时集群已经添加了SCRAM认证机制,只有提供超级用户的用户密码才能创建普通用户。
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adsecret";3.2.查看zimug用户凭证
> bin/kafka-configs.sh --bootstrap-server:9092 --describe --entity-type users --entity-name zimug --command-config ./super-userfile
响应结果
SCRAM credential configs for user-principal 'zimug' are SCRAM-SHA-256=iterations=8192, SCRAM-SHA-512=iterations=40963.3.删除zimug用户凭证
bin/kafka-configs.sh --bootstrap-server:9092 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name zimug --command-config ./super-userfile
bin/kafka-configs.sh --bootstrap-server四、配置kafka客户端:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name zimug --command-config ./super-userfile
服务端调整完成之后,我们经常测试使用的kafka-console-producer.sh和kafka-console-consumer.sh也必须添加权限验证配置才能访问集群。配置方法和上一篇文章中《用户名密码PLAIN认证-模拟客户端shell脚本调整》是一致的,只不过vim config/kafka_client_jaas.conf文件的内容需要换一下。
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="zimug"
password="xxyyzz345";
};
vim bin/kafka-console-producer.sh和vim bin/kafka-console-consumer.sh脚本中找到x$KAFKA_HEAP_OPTS,添加以下参数:
-Djava.security.auth.login.config=/opt/kafka/default/config/kafka_client_jaas.conf
以上2个脚本中都要添加这个启动参数,如图:
然后使用下面的脚本完成测试(测试方法参考《集群可用性验证及配置》)
# 开启生产者: /opt/kafka/default/bin/kafka-console-producer.sh --broker-list:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256
# 开启消费者: /opt/kafka/default/bin/kafka-console-consumer.sh --bootstrap-server五、apache kafka参数配置:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256
把下面的代码加入apacke kafka的Java API的生产者或消费者配置参数中,生产者和消费者客户端就能够使用SCRAM方式以及用户名和密码连接kafka集群。
props.setProperty ("security.protocol", "SASL_PLAINTEXT");
props.setProperty ("sasl.mechanism", "SCRAM-SHA-256");
String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule requiredn"
+ "username = "" + "zimug" + ""n"
+ "password ="" + "xxyyzz345" + "";";
props.setProperty("sasl.jaas.config", jassc);
六、Spring kafka参数配置
对于Spring kafka使用SCRAM方式登录认证,做如下配置。
spring:
kafka:
consumer: #这里可以是producer
properties:
sasl.mechanism: SCRAM-SHA-256
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='zimug' password='xxyyzz345';



