| 方式 | 优点 | 缺点 |
|---|---|---|
| client id | 简单便捷 | client id,一次只能有一个生产者实例,只能单并发 |
| user | 可以多 producer 同时进行,可与client id 进行组合,可以设置用户密码,增加一定的安全性,但用户名密码位置容易暴露 | 需要对kafka 开启安全认证,部署复杂行增加 |
使用方法
# 对 test_lz4_10m_client 进行限流 生产消费速率为 10 M/S ./bin/kafka-configs.sh --bootstrap-server xxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client基于 user 限流 kafka 认证方式
- SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0
- SASL/PLAIN - starting at version 0.10.0.0 (每次生效需要重启broker)
- SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0 (可动态增加用户)
- SASL/OAUTHBEARER - starting at version 2.0
其中 SASL/GSSAPI (Kerberos) 这种可能是生产环境使用最合适的,但是笔者这边暂时还没有使用 Kerberos ,所以这里主要使用 SASL/PLAIN 和 SASL/SCRAM-SHA-256 这两种做一个探索。
使用 SASL_PLAINTEXT/PLAIN 进行用户认证实现限流- broker 配置
#配置 ACL 入口类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #本例使用 SASL PLAINTEXT listeners=SASL_PLAINTEXT://xxxxxx:9092 advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092 security.inter.broker.protocol= SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN #设置本例中 admin 为超级用户 super.users=User:admin
- 创建 kafka_server_jaas.conf
# 这里是创建了两个用户admin和test user_admin 表示用户 admin,后面的 admin-secret 表示是 admin 的密码
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_test="123456";
};
- 启动脚本添加指定 kafka_server_jaas.conf
vim kafka-server-start.sh
# 添加 -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
- 添加生产者消费者配置文件
# 生产者 producer_jaas.conf 消费者 consumer_jaas.conf 权限文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username = "test"
password="123456";
};
# 修改 kafka-console-producer.sh
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/producer_jaas.conf kafka.tools.ConsoleProducer "$@"
# 修改 kafka-console-consumer.sh
## -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
- 使用
# 1. 进行授权对 用户 # 给用户 test 对 topic test_se 读的权限 ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --topic test_se # 给用户 test 的 group test-group 赋予读的权限 ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Read --group test-group # 给用户 test 赋予 test_se Write 的权限 ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xxxxxx:2181/kafka27 --add --allow-principal User:test --operation Write --topic test_se # 2. producer 使用 ./bin/kafka-console-producer.sh --bootstrap-server xxxxxx:9092 --topic test_se --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN # 3. consumer 使用 ./bin/kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --from-beginning --topic test_se --consumer.config ./config/console_consumer.conf使用 SASL_PLAINTEXT/SCRAM 进行用户认证实现限流
- broker 配置
###################### SASL ######################### sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT listeners=SASL_PLAINTEXT://xxxxxx:9092 advertised.listeners=SASL_PLAINTEXT://xxxxxx:9092 ####################### ACL ######################## authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
- 创建 kafka-broker-scram.jaas
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
- 指定 kafka-broker-scram.jaas 位置
修改 vim kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/kafka_2.13-2.7.1/config/auth/kafka-broker-scram.jaas kafka.Kafka "$@"
- 添加生产者消费者配置文件
# consumer-scram.conf 或 producer-scram.conf security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="123456";
- 授权
# 添加用户 ./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name test # 读权限 ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --consumer --topic 'test_topic' --group '*' # 写权限 ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=xxxxxx:2181 --add --allow-principal User:"test" --producer --topic 'test_topic' # 生产者 ./bin/kafka-console-producer.sh --broker-list xxxxxx:9092 --topic test_scram --producer.config config/auth/producer-scram.conf # 消费者 ./bin/kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic test_scram --consumer.config config/auth/consumer-scram.conf限流
# 对用户 test 限流 10M/S ./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name test # 对 client id 为 clientA 的限流 10M/S ./bin/kafka-configs.sh --zookeeper xxxxxx:2181/kafka27 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA # /bin/kafka-configs.sh --bootstrap-server xxxxxx:9092 --alter --add-config 'producer_byte_rate=10240,consumer_byte_rate=10240,request_percentage=200' --entity-type clients --entity-name test_lz4_10m_client
此处的限流应该是对单个 broker 限流为 10 M/S ,应为测试 topic 有 3 分区分别分布在三个 broker 所以总体限流大概在 30M/S 左右ß



