一、创建/删除/重建topic
1、创建:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic TOPIC_NAME
2、删除:./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TOPIC_NAME
3、查看状态和分区负载详情:./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TOPIC_NAME
二、设置过期时间
以下例子为保存7天
处理方法:
1、设置:./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --alter --entity-name TOPIC_NAME --add-config retention.ms=604800000
2、删除:./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --alter --entity-name TOPIC_NAME --delete-config retention.ms
三、删除某个offset之前的数据
说明:需求是删除某个offset之前的数据;一般来说,直接重建topic比较快,但是不排除不能重建topic的情况
处理方法:
1、创建json文件,以下例子是删除topic:test_product,分区:0,offset:10之前的数据,不包含offset 10,改操作不会重置offset
示例:{"partitions":[{"topic": "test_product", "partition": 0,"offset": 10}],"version":1}
2、执行命令:./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file cleanup.json
四、kafka磁盘报警
处理方法:
1、查看哪个topic数据量比较大,进入kafka数据目录,执行命令查看分区数大于1G的:du -sh * |grep G (注意:不一定是topic数据量大,也有可能是日志之类)
2、查看数据量最大的topic是否设置过期时间,如果未设置,和业务部门确认是否可以设置;如果已设置,是否需要缩短时间或者重建topic
3、如果以上方法都无法解决,让业务部门提单扩容磁盘。
五、kafka_exporter报警
处理方法:一般情况是CPU使用率过高导致拉取不到监控数据,也有可能是网络问题;查看监控面板,如果CPU使用率突然升高需要确认,如果CPU持续较高需要升级配置。
六、重置消费者offset
说明:有些场景可能希望修改消费者消费到的offset位置,以达到重新消费,或者跳过一部分消息的目的
处理方法:
1、将指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。
注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
2、将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-earliest --topic TOPIC_NAME(从头消费)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-latest --topic TOPIC_NAME(从最新消费)
七、查询topic的offset的范围
说明:一般很少用到,可以直接在kafdrop上面看,如果没有kafdrop的时候使用以下方法查询
处理方法:
查询offset的最小值(理解为消费者的offset)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -2
查询offset的最大值(理解为生产者的offset)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -1
八、导出某个topic的数据
说明:注意topic数据量大小和磁盘大小,最好不要重定向到根分区;如果需要下载先压缩再下载,压缩比很高
处理方法:从头开始消费重定向到文件
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_NAME --from-beginning > TOPIC_NAME
九、增加topic分区数(只能加不能减)
# ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 12 --topic ec-applog-err
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# 这里给了一个警告 : 如果为具有关键字的主题增加了分区,则分区逻辑或消息的顺序将受到影响。
# 如果我们的业务场景对消息的顺序有着严格的要求, 一定要谨慎添加分区! 建议将之前所有的消息全部消费完才执行添加分区操作
十、控制台生产者/消费者
生产者:./bin/kafka-console-producer--broker-list localhost:9092 --topic test1
消费者:./bin/kafka-console-consumer--zookeeper localhost:2181 --topic test1 --from-beginning
十一、停止/启动服务
启动:./bin/kafka-server-start.sh -daemon config/server.properties
停止:./bin/kafka-server-stop.sh
十二、kafka鉴权配置
配置文件示例server.properties
| # ----------- security begin ----------- authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin sasl.enabled.mechanisms=SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 security.inter.broker.protocol=SASL_PLAINTEXT listeners=SASL_PLAINTEXT://192.168.1.213:9092 advertised.listeners=SASL_PLAINTEXT://192.168.1.213:9092 # ----------- security end ----------- |
配置文件示例kafka_server_jaas.conf
| KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; }; |
修改启动脚本,添加bin/kafka-server-start.sh
export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf
授权脚本
| [root@data-kafka-auth-prod01 scripts]# cat chmod_read.sh #!/bin/sh #通过传入用户名、用户消费组作为参数、即可批量赋权 cat topic | while read line do kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:$1 --operation Read --group $2 --topic $line done [root@data-kafka-auth-prod01 scripts]# cat chmod_write.sh #!/bin/sh #通过传入用户名、即可批量赋写权限 cat topic | while read line do kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:$1 --operation Write --topic $line done |



