本文介绍常用的topic管理命令,主要涉及kafka-topics脚本, kafka-reassign-partitions和kafka-config脚本,前者是专门的topic相关的脚本,中间的是分区重分配相关的脚本,后者是配置相关的脚本,不仅可以管理topic,还能管理broker,consumer等。
1.topic的管理: 增kafka-topics --bootstrap-server broker_host:port --create --topic删--partitions 1 --replication-factor 1
kafka-topics --bootstrap-server broker_host:port --delete --topic查
查用户建的某个主题的详细参数describe
kafka-topics --bootstrap-server broker_host:port --describe --topic
查看所有的主题
kafka-topics --bootstrap-server broker_host:port --list改
修改主题分区(数量只能增不能减,否则会跑出invalidPartitionsException的异常)
kafka-topics --bootstrap-server broker_host:port --alter --topic--partitions
修改主题级别的参数,比如max.message.bytes
kafka-configs --zookeeper zookeeper_host:port --entity-type topics --entity-name--alter-config max.message.bytes=100485760
变更副本数量
分为两个步骤:
1.提供一个json文件(reassign.json)去指定所有的topic的partition和replicas,replicas列表是broker_id,列表的第一个broker_id是leader replica所在的broker。
{"version":1, "partitions":[
{"topic":"topic_name","partition":0,"replicas":[0,1,2]},
{"topic":"topic_name","partition":1,"replicas":[0,2,1]},
{"topic":"topic_name","partition":2,"replicas":[1,0,2]},
{"topic":"topic_name","partition":3,"replicas":[1,2,0]}
]}
2.使用命令
kafka-reassign-partitions --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
修改主题限速
kafka-configs --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=1004857600,follower.replication.throttled.rate=104857600'
主题分区迁移
kafka-reassign-partitions2.特殊的主题管理
内部主题:__consumer_offsets和__transaction_state
前者是记录consumer消费的进度的,后者是支持事物机制后引入的
查看位移提交数据
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupmetadataManager$OffsetsMessageFormatter" --from-beginning
读取__consumer_offset主题消息,查看消费者组的状态信息
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupmetadataManager$GroupmetadataMessageFormatter" --from-beginning3.运行命令常见的错误
1.删除主题失败
可能原因:
broker down删除的topic部分分区依然在执行迁移过程
解决:
手动删除zookeeper节点/admin/delete_topic/ 2.__consumer_offsets占用太多的磁盘 可能原因: cleaner线程挂了,无法清理此内部主题
解决: 显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态,如果是这个原因导致的,就重启相应的 Broker
参考资料: kafka核心技术与实战-极客时间



