栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 命令行操作

Kafka 命令行操作

查看上篇文章:Kafka集群安装,配置一下环境变量,下面操作就不带绝对路径了。

比如 执行 /home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh 查看操作主题命令参数,直接 执行 kafka-topics.sh 就完事,因为配置了环境变量,有不明白的小伙伴,可以直接百度:Centos7 如何配置环境变量

主题命令行操作 查看操作主题命令参数

kafka-topics.sh 脚本使用参数列表:

[vagrant@localhost kafka_2.12-3.0.0]$ kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum.     
--bootstrap-server                               to.                                  
--command-config                     passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config             A topic configuration override for the 
                                           topic being created or altered. The  
                                           following is a list of valid         
                                           configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                           replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                local.retention.bytes                 
                                                local.retention.ms                    
                                                max.compaction.lag.ms                 
                                                max.message.bytes                     
                                                message.downconversion.enable         
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                remote.storage.enable                 
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs. It is  
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used (the kafka-configs CLI       
                                           supports altering topic configs with 
                                           a --bootstrap-server option).        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config            A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist.        
--list                                   List all available topics.             
--partitions   The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment                                            
--replication-factor                       partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic                   The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test.topic".                    
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.        
常用参数
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
–create创建主题。
–delete删除主题。
–alter修改主题。
–list查看所有主题。
–describe查看主题详细描述。
–partitions 设置分区数。
–replication-factor设置分区副本。
–config 更新系统默认的配置。
查看当前服务器中的所有 topic
[vagrant@localhost config]$ /home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list

没有topic,显示空行

创建topic

创建 first topic

[vagrant@localhost bin]$ /home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --create --partitions 1 --replication-factor 3 --topic first
Created topic first.

选项说明:

--topic 定义 topic 名--replication-factor 定义副本数--partitions 定义分区数 查看 first 主题的详情

[vagrant@localhost bin]$ /home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --topic first
Topic: first    TopicId: YDkyV_QwSoCdutAXiMkzig PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
修改分区数

注意:分区数只能增加,不能减少

[vagrant@localhost config]$ kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --alter --topic first --partitions 3
[vagrant@localhost config]$ kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --topic first
Topic: first    TopicId: YDkyV_QwSoCdutAXiMkzig PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: first    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: first    Partition: 2    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
删除 topic
kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --delete --topic first
生产者命令行操作 生产者命令参数
[vagrant@localhost config]$ kafka-console-producer.sh
Missing required option(s) [bootstrap-server]
Option                                   Description                            
------                                   -----------                            
--batch-size              Number of messages to send in a single 
                                           batch if they are not being sent     
                                           synchronously. (default: 200)        
--bootstrap-server                               (deprecated) is specified. The server
                                           (s) to connect to. The broker list   
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.                               
--broker-list       DEPRECATED, use --bootstrap-server     
                                           instead; ignored if --bootstrap-     
                                           server is specified.  The broker     
                                           list string in the form HOST1:PORT1, 
                                           HOST2:PORT2.                         
--compression-codec [String:             The compression codec: either 'none',  
  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  
                                           If specified without value, then it  
                                           defaults to 'gzip'                   
--help                                   Print usage information.               
--line-reader      The class name of the class to use for 
                                           reading lines from standard in. By   
                                           default each line is read as a       
                                           separate message. (default: kafka.   
                                           tools.                               
                                           ConsoleProducer$LineMessageReader)   
--max-block-ms                                     block for during a send request      
                                           (default: 60000)                     
--max-memory-bytes                                 to buffer records waiting to be sent 
                                           to the server. (default: 33554432)   
--max-partition-memory-bytes            partition. When records are received 
                                           which are smaller than this size the 
                                           producer will attempt to             
                                           optimistically group them together   
                                           until this size is reached.          
                                           (default: 16384)                     
--message-send-max-retries      Brokers can fail receiving the message 
                                           for multiple reasons, and being      
                                           unavailable transiently is just one  
                                           of them. This property specifies the 
                                           number of retries before the         
                                           producer give up and drop this       
                                           message. (default: 3)                
--metadata-expiry-ms                      after which we force a refresh of    
                                           metadata even if we haven't seen any 
                                           leadership changes. (default: 300000)
--producer-property                            properties in the form key=value to  
                                           the producer.                        
--producer.config   Producer config properties file. Note  
                                           that [producer-property] takes       
                                           precedence over this config.         
--property                 A mechanism to pass user-defined       
                                           properties in the form key=value to  
                                           the message reader. This allows      
                                           custom configuration for a user-     
                                           defined message reader. Default      
                                           properties include:                  
                                                parse.key=true|false                  
                                                key.separator=         
                                                ignore.error=true|false               
--request-required-acks                    requests (default: 1)                
--request-timeout-ms                               requests. Value must be non-negative 
                                           and non-zero (default: 1500)         
--retry-backoff-ms              Before each retry, the producer        
                                           refreshes the metadata of relevant   
                                           topics. Since leader election takes  
                                           a bit of time, this property         
                                           specifies the amount of time that    
                                           the producer waits before refreshing 
                                           the metadata. (default: 100)         
--socket-buffer-size      The size of the tcp RECV size.         
                                           (default: 102400)                    
--sync                                   If set message send requests to the    
                                           brokers are synchronously, one at a  
                                           time as they arrive.                 
--timeout           If set and the producer is running in  
                                           asynchronous mode, this gives the    
                                           maximum amount of time a message     
                                           will queue awaiting sufficient batch 
                                           size. The value is given in ms.      
                                           (default: 1000)                      
--topic                   REQUIRED: The topic id to produce      
                                           messages to.                         
--version                                Display Kafka version.      
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
发送消息
[vagrant@localhost config]$ kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic first
>hello1

先发一个消息:hello1

消费者命令行操作 消费者命令参数
[vagrant@localhost bin]$ kafka-console-consumer.sh
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description                            
------                                   -----------                            
--bootstrap-server                                                                    
--consumer-property                            properties in the form key=value to  
                                           the consumer.                        
--consumer.config   Consumer config properties file. Note  
                                           that [consumer-property] takes       
                                           precedence over this config.         
--enable-systest-events                  Log lifecycle events of the consumer   
                                           in addition to logging consumed      
                                           messages. (This is specific for      
                                           system tests.)                       
--formatter               The name of a class to use for         
                                           formatting kafka messages for        
                                           display. (default: kafka.tools.      
                                           DefaultMessageFormatter)             
--from-beginning                         If the consumer does not already have  
                                           an established offset to consume     
                                           from, start with the earliest        
                                           message present in the log rather    
                                           than the latest message.             
--group       The consumer group id of the consumer. 
--help                                   Print usage information.               
--include   Regular expression specifying list of  
                                           topics to include for consumption.   
--isolation-level                Set to read_committed in order to      
                                           filter out transactional messages    
                                           which are not committed. Set to      
                                           read_uncommitted to read all         
                                           messages. (default: read_uncommitted)
--key-deserializer                                                          
--max-messages    The maximum number of messages to      
                                           consume before exiting. If not set,  
                                           consumption is continual.            
--offset         The offset to consume from (a non-     
                                           negative number), or 'earliest'      
                                           which means from beginning, or       
                                           'latest' which means from end        
                                           (default: latest)                    
--partition          The partition to consume from.         
                                           Consumption starts from the end of   
                                           the partition unless '--offset' is   
                                           specified.                           
--property                 The properties to initialize the       
                                           message formatter. Default           
                                           properties include:                  
                                          print.timestamp=true|false            
                                          print.key=true|false                  
                                          print.offset=true|false               
                                          print.partition=true|false            
                                          print.headers=true|false              
                                          print.value=true|false                
                                          key.separator=         
                                          line.separator=       
                                          headers.separator=    
                                          null.literal=           
                                          key.deserializer=   
                                          value.deserializer=                        
                                          header.deserializer=                        
                                         Users can also pass in customized      
                                           properties for their formatter; more 
                                           specifically, users can pass in      
                                           properties keyed with 'key.          
                                           deserializer.', 'value.              
                                           deserializer.' and 'headers.         
                                           deserializer.' prefixes to configure 
                                           their deserializers.                 
--skip-message-on-error                  If there is an error when processing a 
                                           message, skip it instead of halt.    
--timeout-ms        If specified, exit if no message is    
                                           available for consumption for the    
                                           specified interval.                  
--topic                   The topic to consume on.               
--value-deserializer                                                       
--version                                Display Kafka version.                 
--whitelist                                 ignored if --include specified.      
                                           Regular expression specifying list   
                                           of topics to include for consumption.
参数描述
–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
–from-beginning从头开始消费。
–group 指定消费者组名称。
消费消息 消费 first 主题中的数据
[vagrant@localhost bin]$ kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic first

这是就会看到控制台上没有消息,因为消费者在生产者发送hello1消息之后启动的,现在生产者接着发消息hello2 ,这时消费者就能消费到:

[vagrant@localhost bin]$ kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic first
hello2
把主题中所有的数据都读取出来(包括历史数据)
[vagrant@localhost bin]$ kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --from-beginning --topic first
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746550.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号