栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

kafka运维常用命令

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka运维常用命令

管理topic相关的

1. 创建topic

./kafka-topics.sh --create --partitions 1 --replication-factor 2 --topic test --zookeeper zk_host:port/chroot

2. 删除topic

./kafka-topics.sh --delete --topic test --zookeeper zk_host:port/chroot

3. 查看topic描述信息

./kafka-topics.sh --describe --topic test --zookeeper zk_host:port/chroot

4. 列出所有topic

./kafka-topics.sh --list --zookeeper zk_host:port/chroot

5. 增加topic分区数(注意不能减少)

./kafka-topics.sh --alter --topic test --partitions 40 --zookeeper zk_host:port/chroot

6. 添加或修改topic的config配置(注意配置项必须是kafka认识的不然不会成功)

./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test --config x=y

  或

./kafka-configs.sh --zookeeper zk_host:port/chroot --alter --entity-name test --entity-type topics --add-config x=y

  7. 删除topic的config配置

./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test --delete-config x

  或

./kafka-configs.sh --zookeeper zk_host:port/chroot --alter --entity-name test --entity-type topics --delete-config x

8. console端向topic生产数据

./kafka-console-producer.sh --broker-list xxx:9092 --topic test

9. console端消费数据(从头消费就加上 --from-beginning)

./kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic test

10. 查看 __consumer_offsets中的消息

./kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic __consumer_offsets --partition 35 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

使用 OffsetsMessageFormatter 打印的格式可以概括为:

"[%s,%s,%d]::[OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]".format(group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)

11. 查看topic分区偏移量最大最小值

./kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list xxx:9092 --partitions 0

time值 -1表示latest即最大值,-2表示earliest即最小值,不指定--partitions选项则列出所有分区的 

12. 对 topic 分区 replica leader 进行负载均衡

 

./kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot --path-to-json-file xxx.json

不指定 --path-to-json-file的话就是对所有topic分区进行均衡,指定的话,格式如下:

{"partitions":[{"topic":"test","partition":0},{"topic":"test","partition":1}]}
管理consumer group相关的

1. 列出所有的group

./kafka-consumer-groups.sh --bootstrap-server xxx:9092 --list

2. 查看某个group的相关描述信息,比如消费到kafka的offset位置等

./kafka-consumer-groups.sh --bootstrap-server xxx:9092 --describe --group g1

使用--describe后,就可以添加一些附加选项,比如:

--members:只列出group中活动的成员,即没连接的不会显示

--members --verbose:除了列出活动的成员外,该选项还提供了分配给每个成员的topic分区

--offsets:这是默认的描述选项,并提供与“--describe”选项相同的输出。

--state:这个选项提供了有用的group-level信息

3. 删除group

./kafka-consumer-groups.sh --bootstrap-server xxx:9092 --delete --group g1

4. 重新设置group的消费的offsets位置(需要确保重设的group不是处于活动状态)

重设group的消费的offsets信息可以使用选项 --reset-offsets。

重设offset由三个维度决定:topic作用域、执行方案、重置策略。

topic作用域

--all-topics:group内的所有topic都重设
--topic t1 --topic t2:为指定的若干个topic重设offset
--topic t1:0,1,2:为指定的topic的指定分区重设offset

执行方案

什么都不加:只是打印出offset重设的方案,不具体执行

--execute:执行reset-offsets方案

--export:将offset重设方案打印出来到csv文件

重置策略

--to-datetime : 将偏移量重设为datetime时的偏移量,Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : 将偏移量重设为最早偏移量
--to-latest : 将偏移量重设为最新偏移量
--shift-by : 把偏移量设置为当前偏移量+N处,N可以是负数,表示向前移动
--from-file : 从csv文件中读取重设策略
--to-current : 将偏移量重设为当前偏移量
--by-duration : 将偏移量重设为距离当前给定时间间隔的位移处,Format: 'PnDTnHnMnS'(是一个符合ISO-8601规范的Duration格式,以字母P开头,后面由4部分组成,即 D、H、M和S,分别表示天、小时、分钟和秒。如果想将位移调回到15分钟前,可以指定PT0H15M0S。)
--to-offset : 将偏移量重设为指定的偏移量
示例:

./kafka-consumer-groups.sh --bootstrap-server xxx:9092 --reset-offsets --group g1 --topic topic1 --to-latest

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/840090.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号