目录
前言
查看kafka版本
通过zookeeper连接kafka查询命令
kafka查看主题
kafka查看主题详情
前言
所有命令都是在linux系统执行
查看kafka版本
进入安装后目录执行以下命令:
find ./libs/ -name *kafka_* | head -1 | grep -o 'kafka[^n]*'
会看到结果:kafka_2.12-1.0.0-javadoc.jar.asc,其中,2.12为scala版本,1.0.0为kafka版本。
通过zookeeper连接kafka执行命令
预制条件:
1. 登录其中的一个broker,/opt/cloudera/parcels/CDH/lib/kafka/bin)
2. 清除KAFKA_OPTS的鉴权值(可以重新登录自动清除)
查看主题
./kafka-topics.sh --list --zookeeper localhost:2181/kafka
查看主题详情
./kafka-topics.sh --list --zookeeper localhost:2181/kafka
创建主题./kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test_yp --partitions 1 --replication-factor 1
删除主题./kafka-topics.sh --delete --zookeeper localhost:2181/kafka --topic test_yp
注意:当/kafka安装目录/config/server.properties文件中delete.topic.enable设置为true时才生效。
需要直接连接kafka操作的命令如果需要鉴权的话,进行如下操作:
1. 准备如下三个文件:
| /home/ic.dev.yang.peng48/client.properties | group.id= // 消费组,同一消费组消息只能消费一次 security.protocol= SASL_PLAINTEXT sasl.kerberos.service.name= // 设置服务名,和server段保持一致 |
| /home/ic.dev.yang.peng48/jaas.conf | 鉴权信息 |
| /home/ic.dev.yang.peng48/krb5.conf | 鉴权信息 |
2. 设置KAFKA_OPTS,export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ic.dev.yang.peng48/jaas.conf -Djava.security.krb5.conf=/home/ic.dev.yang.peng48/krb5.conf"
生产消息./kafka-console-producer.sh --broker-list worker01-cdpdev-ic:9092,worker02-cdpdev-ic:9092,worker03-cdpdev-ic:9092 --topic test_yp --producer.config /home/ic.dev.yang.peng48/client.properties
消费消息./kafka-console-consumer.sh --bootstrap-server worker01-cdpdev-ic:9092,worker02-cdpdev-ic:9092,worker03-cdpdev-ic:9092 --topic test_yp --from-beginning --consumer.config /home/ic.dev.yang.peng48/client.properties
查询消费组./kafka-consumer-groups.sh --bootstrap-server worker01-cdpdev-ic:9092,worker02-cdpdev-ic:9092,worker03-cdpdev-ic:9092 --list --command-config /home/ic.dev.yang.peng48/client.properties
查询具体消费组情况./kafka-consumer-groups.sh --bootstrap-server worker01-cdpdev-ic:9092,worker02-cdpdev-ic:9092,worker03-cdpdev-ic:9092 --describe --group test_group --command-config /home/ic.dev.yang.peng48/client.properties



