栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka-2.集群搭建,topic+partition消费逻辑梳理

kafka-2.集群搭建,topic+partition消费逻辑梳理

kafka 集群搭建

这里博主用的kafka2.6.0
https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz
上传服务器

解压安装
$ tar -xzf kafka_2.13-2.6.0.tgz
$ cd kafka_2.13-2.6.0/

要修改的配置项

broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183


修改Linux 命令目录

vi /etc/profile
export KAFKA_HOME=/home/vmuser/kafka_2.13-2.6.0
export PATH=$PATH:${KAFKA_HOME}/bin
source /etc/profile

启动zookeeper
进入客户端目前目录下有

[zk: localhost:2181(CONNECTED) 1] ls /
[bbb, ooxx, zookeeper]

配置完成后启动kafka

 kafka-server-start.sh  server.properties 

[zk: localhost:2181(CONNECTED) 4] ls /
[admin, bbb, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, ooxx, zookeeper]

[zk: localhost:2181(CONNECTED) 3] get -s /controller
{"version":1,"brokerid":0,"timestamp":"1646317349694"}  
cZxid = 0x50000001c
ctime = Thu Mar 03 22:22:29 CST 2022
mZxid = 0x50000001c
mtime = Thu Mar 03 22:22:29 CST 2022
pZxid = 0x50000001c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x20d24b2a3e60000
dataLength = 54
numChildren = 0

[zk: localhost:2181(CONNECTED) 8] ls /brokers 
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics 
[]

这个时候博主觉得这种配置有点杂乱,如果有不同的人配置kafka,或者别的东西怎么办,不能都在根目录

    进入zk客户端 删除目录
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka
    重新启动kafka 发现出问题
The broker is trying to join the wrong cluster. Configured zookeeper.connect
zookeeper.connect may be wrong.
    at kafka.server.KafkaServer.startup(KafkaServer.scala:223)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

删除kafka 配置文件下的log 目录日志文件。重新启动就行
重新进入zk

[zk: localhost:2181(CONNECTED) 0] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
创建分区
[root@localhost vmuser]# kafka-topics.sh --zookeeper node1:2181/kafka --create --topic ooxx --partitions  2 --replication-factor 2
Created topic ooxx.
create                  Create a new topic. 
Partition Count:partition 个数。
Replication-Factor:副本个数。
Partition:partition 编号,从 0 开始递增。
Leader:当前 partition 起作用的 breaker.id。
Replicas: 当前副本数据所在的 breaker.id,是一个列表,排在最前面的其作用。
Isr:当前 kakfa 集群中可用的 breaker.id 列表。
[root@localhost vmuser]# kafka-topics.sh --zookeeper node1:2181/kafka --list
ooxx
[root@localhost vmuser]# kafka-topics.sh --zookeeper node1:2181/kafka --describe ooxx
Topic: ooxx     PartitionCount: 2       ReplicationFactor: 2    Configs: 
        Topic: ooxx     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2
        Topic: ooxx     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
[root@localhost vmuser]# 
消费端
[root@localhost config]# kafka-console-consumer.sh
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description                            
------                                   -----------                            
--bootstrap-server                                                                    
--consumer-property                            properties in the form key=value to  
                                           the consumer.                        
--consumer.config   Consumer config properties file. Note  
                                           that [consumer-property] takes       
                                           precedence over this config.         
--enable-systest-events                  Log lifecycle events of the consumer   
                                           in addition to logging consumed      
                                           messages. (This is specific for      
                                           system tests.)                       
--formatter               The name of a class to use for         
                                           formatting kafka messages for        
                                           display. (default: kafka.tools.      
                                           DefaultMessageFormatter)             
--from-beginning                         If the consumer does not already have  
                                           an established offset to consume     
                                           from, start with the earliest        
                                           message present in the log rather    
                                           than the latest message.             
--group       The consumer group id of the consumer. 
--help                                   Print usage information.               
--isolation-level                Set to read_committed in order to      
                                           filter out transactional messages    
                                           which are not committed. Set to      
                                           read_uncommitted to read all         
                                           messages. (default: read_uncommitted)
--key-deserializer                                                          
--max-messages    The maximum number of messages to      
                                           consume before exiting. If not set,  
                                           consumption is continual.            
--offset         The offset id to consume from (a non-  
                                           negative number), or 'earliest'      
                                           which means from beginning, or       
                                           'latest' which means from end        
                                           (default: latest)                    
--partition          The partition to consume from.         
                                           Consumption starts from the end of   
                                           the partition unless '--offset' is   
                                           specified.                           
--property                 The properties to initialize the       
                                           message formatter. Default           
                                           properties include:                  
                                                print.timestamp=true|false            
                                                print.key=true|false                  
                                                print.value=true|false                
                                                key.separator=         
                                                line.separator=       
                                                key.deserializer=   
                                                value.deserializer=                        
                                         Users can also pass in customized      
                                           properties for their formatter; more 
                                           specifically, users can pass in      
                                           properties keyed with 'key.          
                                           deserializer.' and 'value.           
                                           deserializer.' prefixes to configure 
                                           their deserializers.                 
--skip-message-on-error                  If there is an error when processing a 
                                           message, skip it instead of halt.    
--timeout-ms        If specified, exit if no message is    
                                           available for consumption for the    
                                           specified interval.                  
--topic                   The topic id to consume on.            
--value-deserializer                                                       
--version                                Display Kafka version.                 
--whitelist           Regular expression specifying          
                                           whitelist of topics to include for   
                                           consumption

使用 replica-assignment 参数手动指定 Topic Partition Replica 与 Kafka Broker 之间的存储映射关系。

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

因为博主上面没有配置监听端口所以出错
修改程本机ipo

listeners=PLAINTEXT://:9092

消费端

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic ooxx  --group kafkatest

生产者

kafka-console-producer.sh  --broker-list node3:9092 --topic ooxx
>hello
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751246.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号