栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka从入门到不放弃

kafka从入门到不放弃

目录

一、kafka基本概述

二、生产者

三、消费者

3.1 常用API

3.2 再均衡策略

3.3 分区分配策略

四、服务端

4.1 控制器

4.2 日志存储

4.3 日志清理

五、zookeeper

六、常用配置

6.1 生产者基本参数

6.2 消费者基本参数

6.3 服务端基本参数

七、常用命令

一、kafka基本概述

kafka是Apache下的一款采用Scala+java语言开发的多分区、多副本的分布式消息系统,具有高吞吐量、可持久化、可水平扩展、支持流数据处理等特性。在2.8.0之前依赖于zookeeper协调器。

kafka的基础架构:

 kafka架构是由多个不同角色组成,相互协作:

1)Producer:生产者,向kafka发布消息的角色;

2)Consumer:消费者,从kafka中拉取消息的角色;

3)Consumer Group:消费者组,跟消费者是一对多的关系,一个消费组包含多个消费者,某一个消费者一定属于某个消费组,同一个消费组的不同消费者不能消费同一个分区中的消息;

3)Broker:服务节点,一个kafka实例对应一个broker,集群是由多个broker组成;

4)Topic:指一类消息的总称,逻辑概念,生产者、消费者必须要指定对应的topic;

5)Partition:分区,一个topic可以分为多个partition,一个partition里面的消息是有顺序的,不同partition中的消息无序;

6)Replica:副本,为了保证数据的高可用,提高kafka容灾能力,kafka通过Replica保存多份数据,一个分区可以包括多个副本,副本包括leader和follower,leader负责读写,follower只负责同步数据;kafka要求副本必须要不同的broker中;

7)zookeeper:管理kafka中的元数据,比如broker、topic、partition,还有包括controller控制器的选举等。

kafka中的元数据在zookeeper节点的分布情况:

AR(Assigned Replicas):分区中所有副本的统称;

ISR(In-Sync-Replicas):所有与leader数据保持一定同步的副本(包括leader副本)统称;

OSR(Out-of-Sync-Replicas):与leader数据同步滞后的副本统称;

AR=ISR集合+OSR集合;正常情况下,所有副本都能和leader副本数据保持同步,即AR=ISR。

leader副本负责维护和跟踪ISR集合中follower副本滞后状态,如果follower副本滞后太多或者失效,leader副本会将该follower副本从ISR中剔除,加入到OSR集合中。如果OSR集中的follower追上leader副本,leader副本会负责将该follower从OSR中转移到ISR集合。如果leader副本宕机或者失效,controller会从ISR集合中选择一个follower作为新的leader。

ISR与HW和LEO有密切关系。HW(High Watermark)俗称高水位,标识一个特定的消息偏移量(offset),消费者拉取消息只能拉取这个offset之前的数据。

LEO(Log End Offset),标识日志文件下一条将要写的消息偏移量。如下图所示:

其中ISR副本集合中最小的LEO作为分区的HW,即消费者只能消费该分区HW之前的消息。 

二、生产者

即消息的生产者,producer几个重要参数

客户端可以选择不同的构造函数 :

其中topic、value是必传的,partition、key、timestamp可选参数,producer可以通过headers添加一些必要参数,比如用来做幂等操作。

producer发送的整个过程:

拦截器:默认是没有的,可以在发送消息之前做一些准备工作,比如过滤、修改数据等。用户可以通过实现ProducerInterceptor自定义拦截器。

序列化:将消息对象转化成字节数组。kafka自带的序列化有StringSerializer,ByteArray、ByteBuffer、Double、Integer、Long类型的序列化,用户也可以通过实现Serializer接口自定义,要注意的是必须要和消费者反序列保持一致,不然消费的时候会发生错误。

分区器:如果producer在发送消息的时候指定partition,就不需要分区器,如果只是指定了key,默认分区器(DefaultPartioner)首先对key进行哈希(采用MurmurHash2算法),然后对分区数进行取模,所以相同key的消息会写入同一个分区中。如果不指定key,消息会以轮询的方式写入各个分区。

由于2.4之前的版本,如果未指定key,会根据轮询方式,将消息发送给各个分区,在生产者侧,真正发送到kafka集群的是sender线程,发送条件是达到batch.size大小或者是积累的时间超过linger.ms,才会发送,可能导致消费积压发送延迟。

2.4版本之后,引入了StickyPartitioning Strategy(粘性分区策略),解决了由于将消费分成小batch导致发送延迟的问题,粘性分区策略,会将消息统一发送给某个分区,从而保证消费能够快速发送到broke中。

三、消费者

每一个消费者必属于某个消费组,在消费的时候必须要指定消费组group.id、消费主题,一个消费者可以同时消费多个主题。

3.1 常用API

通过subscribe()方法订阅主题

public void subscribe(Collection topics, ConsumerRebalanceListener listener);
public void subscribe(Collection topics);
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener);
public void subscribe(Pattern pattern);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
consumer.subscribe(Pattern.compile("topic-.*"));

除了subscribe外,还可以通过assign()进行订阅

public void assign(Collection partitions);

TopicPartition必须要指定topic和partition;可以通过partionsFor获取该topic下的partition列表;

public List partionsFor(String topic);

可以通过record()方法获取消息集中指定分区的消息

ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(5));
Iterable> records = poll.records(TopicPartition partition);

seek()方法可以指定分区从哪个位置开始消费,只能重置消费消费者分配到的分区的消费位置;

可以通过assignment()获取消费者分配到的分区信息

public void seek(TopicPartition partition, long offset);

Set assignment = consumer.assignment();
for (TopicPartition tp: assignment) {
    consumer.seek(tp, 10);
}
public Map beginningOffsets(Collection partitions); //获取开始的分区信息
public Map endOffsets(Collection partitions); //获取末尾的分区信息

Map offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment){
    consumer.seek(tp, offsets.get(tp));
}

public void seekToBeginning(Collection partitions);
public void seekToEnd(Collection partitions);

比如想要消费昨天8点之后的消息,offsetsForTimes()方法可以实现;

public Map offsetsForTimes(Map, timestampsToSearch) 
public Map offsetsForTimes(Map, timestampsToSearch, Duration timeout)

返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段;

Map timestampToSearch = new HashMap<>(); 
for (TopicPartition tp : assignment) { 
    timestampToSearch.put(tp, System.currentTimeMillis()一1*24*3600*1000);
}
Map offsets = consumer.offsetsForTimes(timestampToSearch); 
for (TopicPartition tp : assignment){
    OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); 
    if (offsetAndTimestamp ! = null) { 
        consumer.seek(tp, offsetAndTimestamp. offset()); 
    }
}

消费流程:通过poll()方法拉取消息、对消息进行反序列化、执行消费者拦截器;

消费者拦截器:实现ConsumerInterceptor接口。比如可以过滤已经过期的消息。

poll()方法中涉及到消息位移、再均衡的逻辑、消费者协调器、组协调器、消费者选举、分区分配的分发、心跳等多个内容。

消费者在消费完数据时需要将消息的偏移量保存起来,防止消费者重启后重复消费;消费位移是存储再内部主题__consumer_offsets中。

3.2 再均衡策略

在消费过程中,当发送以下中的一种情况,会发生reblance:

1)有新的消费者加入或者是有消费者发送宕机退出消费组

2)消费组中所订阅的主题数发送变化

3)消费组中所订阅的分区数发送变化

比如某应用为了提高消费能力,新增消费者节点:

1)Find Coordinator:新增的消费者需要知道该消费组中Coordinator所在的broker,创建与该节点的网络连接。首先该消费者向集群中的某个broker发送FindCoordinatorRequest请求,这里的某个broker是指集群中负载最小的节点。broker收到请求后,根据消费者传的groupId计算__consumer_offsets的分区编号,计算方式为:

Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCount(主题__consumer_offsets分区数)

找到该分区的leader副本所在的broker节点即是该组对应的GroupCoordinator。

2)Join Group:找到GroupCoordinator节点后,消费者向该节点发送JoinGroupRequest请求,并且阻塞等待服务端响应;GroupCoordinator收到请求后,首先做合法性请求校验,比如group_id是否为空;同时服务端会在其他消费者下一个心跳时通知其他消费者进行reblance,其他消费者收到后revoke各自的partition,也向GroupCoordinator发送JoinGroupRequest请求;

GroupCoordinator需要为消费组中选择一个leader,选择分为两种情况;如果消费组中没有leader,那么第一个加入消费组的消费者为leader;如果某一个时刻leader退出了消费组,会随机选举一个新的leader;GroupCoordinator是通过hashMap存储消费者信息的,选择第一个键值对的消费者作为leader;

选举分区分配策略:每个消费者会将自己支持的分配策略发送给GroupCoordinator,选举算法为:

1)GroupCoordinator收集每个消费者支持的分配策略;

2)从每个消费者找出第一个自身支持的策略,为其投上一票;

3)计算各个策略的投票数,找出票数最多的为消费组的分配策略;

GroupCoordinator发送JoinGroupResponse响应给各个消费者,其中leader接收到所有消费者信息和分配策略,

3)Sync Group:leader根据分配策略进行分区分配;各个消费者向GroupCoordinator发送SyncGroupRequest请求进行同步分配方案,此时leader会将分配的方案发送给GroupCoordinator,GroupCoordinator接收到方案后会将消费组的元数据存入__consumer_offsets主题中,然后发送给各个消费者。之后新的消费者开启心跳任务,定期向服务端GroupCoordinator发送HeartbeatRequest请求。

3.3 分区分配策略

1)RangeAssignor分配策略

该分配策略是topic粒度的,将消费者的总数和该topic下的所有分区数整除运算,然后将分区进行平均分配。

例子1:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有4个分区,所有的分区标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3,最终的分配结构为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3

例子2:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有3个分区,所有的分区标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,最终的分配结构为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2

2)RoundRobinAssignor分配策略

将消费者中所有消费者以及所订阅的所有主题下的分区进行排序,然后通过轮询方式将分区分配给消费者。

例子1:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有3个分区,所有的分区标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,最终的分配结构为:

消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2

例子2:三个消费者C0,C1,C2,订阅三个主题,三个主题对应的分区数分别为1、2、3,C0订阅t0,C1订阅t0,t1,C2订阅t0,t1,t2,所有的分区标识为:t0p0、t1p0、t1p1、t2p0、t2p1、t2p2,最终的分配结构是:

消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

3)0.11版本后,新增了StickyAssignor分配策略

        a)分区的分配尽可能均匀;

        b)分区的分配尽可能与上次分配的保持相同。

例子1:三个消费者C0,C1,C2,订阅三个主题,三个主题对应的分区数分别为1、2、3,C0订阅t0,C1订阅t0,t1,C2订阅t0,t1,t2,所有的分区标识为:t0p0、t1p0、t1p1、t2p0、t2p1、t2p2,最终的分配结构是:

消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2

例子2:有三个消费者,同时订阅了四个topic,每个topic有2个分区,所有分区标识为:t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1

刚开始,RoundRobinAssignor和StickyAssignor分配结果是:

消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

如果此时C0脱离了消费组后,RoundRobinAssignor分配结果是:

消费者C1:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1

StickyAssignor分配结果是:

消费者C1:t0p1、t2p0、t3p1、t1p1
消费者C2:t1p0、t2p1、t0p0、t3p0

4)2.4版本后,新增了CooperativeStickyAssignor分配策略,为了减少stop-the-world时间。

前几种策略中,消费者在收到GroupCoordinator的reblance后,都是revoke之前的partition,由GroupCoordinator重新分配;CooperativeStickyAssignor策略是分成多次reblance,不会revoke各自的partition,将各自的partition策略发送给GroupCoordinator,GroupCoordinator会尽量保持之前的策略,通知到消费者,消费者继续消费分配的partition,未分配的partition继续reblance;

5)自定义分配策略:实现PartitionAssignor接口

2.4版本后,引入了静态成员,消费者在可以通过配置group.instance.id,当发送reblance时,在session.timeout.ms内,消费者的分配策略不会发送变化。

四、服务端

4.1 控制器

Controller:选择kafka集群中某一个broker作为集群的Controller,主要负责集群中分区和副本的状态,比如当某个分区的leader副本出现故障,控制器会从ISR集合中选择一个副本作为该飞去的leader副本,或者当检测到分区的ISR集合发送变化时,控制器会通知所有的broker更新其元数据信息,当新增分区时,也是由控制器进行负责分区的分配。

控制器选举:是通过zookeeper进行选择,在zk节点上创建一个临时节点/controller,如果创建成功,该broker,作为控制器,如果创建失败,说明已有控制器,其余未选中的broker会watch该节点。

分区leader的选举:按照AR集合中的顺序选举第一个存活的副本(优先副本策略),且这个副本在ISR集中。

优先副本策略的目的是为了让leader副本在集群中分别尽可能均匀:

Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Configs: 
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 
Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
将broker2节点重启后:
Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Configs: 
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 
Topic: topic-partitions Partition: 1 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2 
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

可自动平衡或者是手动平衡,auto.leader.rebalance.enable默认是true,即自动开启平衡策略,控制器会开启一个定时任务,定时轮询所有broker,如果出现分区不平衡会触发平衡策略,平衡策略会导致客户端一定时间的阻塞。建议改成手动触发。

分区重分配:当某个broker准备下线,该节点的副本需要手动迁移到集群中其他的broker节点上,或者新增broker,让现有的分区分配到新的节点上,需要分区重分配。

分区重分配原理:控制器为每个分区新加副本因子,新的副本因子会从该分区的leader副本复制所有的数据,复制完成后,控制器将旧副本从副本清单中移除。重新分配后有可能会出现leader分配不均衡,需要执行一次优先副本的选举。

分区数不能无线多的原因:

1)限制于服务器文件描述符数量的限制

2)因为分区的元数据都存储与zk上面,如果分区太多,对应的副本信息太多,对zk的读写造成一定的压力

3)由于每个broker和客户端都会缓存分区的信息,分区数太多,占用内存也会越大

4)分区中只有leader副本对外提高读写,当broker发生故障,该节点上的分区将不可用,需要控制器选举新的leader,如果分区太多,导致大量的分区进行leader角色切换,导致短时间内不可用。

4.2 日志存储

 日志被分为多个segment,以topic-partition命名,每个segment包括.log日志文件、.index稀疏索引文件、.timeindex时间戳索引文件,当日志达到一定的要求时,会产生新的segment:

1)当日志段文件的大小大于log.segment.bytes大小,默认1GB

2)日志分段中最大的时间戳与当前系统的时间戳大于log.roll.ms(优先级高)/hours:7天

3)索引文件达到log.index.size.max.bytes:10MB

4)追加的消息偏移量与日志段中最大的偏移量差值大于Integer.MAX_VALUE

稀疏索引文件的数据格式:

relativeOffset:相对偏移量,即对于baseOffset(当前文件的文件名)

potition:消息在日志文件中的物理位置

kafka使用ConcurrentSkipListMap保存各个日志分段,以baseOffset作为key。

比如ConcurrentSkipListMap保存的日志分段:00000000000000000000、00000000000100000000、00000000000200000000、00000000000300000000

现在想要查找偏移量为00000000000100010000的消息:首先在ConcurrentSkipListMap中找到00000000000100000000偏移量对应的稀疏索引文件,在索引文件中,通过二分法找到不大于30的数据18,然后根据对应的物理偏移量565在log文件中向下找到偏移量为30的消息。

 时间戳索引数据格式:

timestamp:当前日志分段中最大的时间戳

relativeOffset:时间戳对应的相对偏移量

查找步骤:

1)将需要查找的targetTimeStamp和每个日志分段中最大的时间戳largestTimeStamp逐一对比,找到不小于targetTimeStamp的日志分段。日志分段中largestTimeStamp的计算是先查询该日志分段对应的时间戳索引文件,找到最后一条索引项,若该值大于0,取其值,否则取日志分段的最近修改时间。

 2)找到日志分段文件后,在时间戳索引文件通过二分法查找到不大于targetTimeStamp的最大索引项,找打对应的相对偏移量,在索引文件中通过二分法找到不大于28的最大索引项,在通过日志分段文件找到对应的消息。

4.3 日志清理

kafka提高了两种日志清理策略。log.cleanup.policy=delete,compact

1)日志删除

a)基于时间保留策略:log.retention.hours:168

通过日志分段文件判断是否超过阈值,当前的时间与分段文件中largestTimeStamp(取对应的时间戳索引文件中最后一条索引项,如果大于0取该值,反之取分段文件的lastModifiedTime)比较。删除时,首先从跳跃表中移除待删除的日志分段,然后将对应的日志分段文件和索引文件后缀添加.delete。由delete-file命名的延迟任务删除以.delete为后缀的文件,file.delete.delay.ms参数:60000(1分钟);

b)基于日志大小保留策略:log.retention.bytes Log中所有日志文件的总大小:-1(无穷大)

首先计算日志文件的总大小和retensize的差值diff,即计算需要删除的日志总大小,然后从日志第一个日志分段开始查找可删除的分段文件的集合(剩余大小必须超过一个分段文件大小才能删除),之后的操作和基于时间一样。

c)基于日志起始偏移量的保留策略

基于日志起始偏移量logStartOffset策略判断依据时日志分段下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是,则可以删除该日志分段

eg:logstartOffset为35,遍历每一个日志段,日志段1下一个日志段的起始偏移量为11,小于35,加入删除集合,日志段2下一个日志段的起始偏移量为23小于35,加入删除集合,日志段3下一个日志段的起始偏移量为30小于35,加入删除集合,日志段4下一个日志段的起始偏移量为40大于35,所以,需要删除的日志段为1、2、3;

2)日志压缩

对于相同的key,不同的value值,只保留最后一个版本。

每个日志清理线程会使用SkimpyOffsetMap的对象存储key和offset的映射关系哈希表。日志清理会遍历两次日志文件,第一次遍历把所有的key的哈希值和最后出现offset保存在Map中,

第二次遍历会检查每个消息是否符合保留条件,即通过偏移量判断,不符合的被清理。

对日志分段文件进行分组:然后按照日志分段文件的顺序进行遍历,每组中的日志空间大小不超过log.segment.bytes,同一组日志清理后会将保留的消息复制到以.clean的临时文件中,文件名以第一个日志分段的文件名命名,

比如00000000000000000000.log.clean。Log Compaction过后将.clean文件修改为.swap,然后删除原来的日志文件,最后把.swap后缀去掉。

五、zookeeper

zookeeper在kafka集群中起着至关重要的角色,在2.8版本之前完全依赖于zookeeper,其中存在的问题有:

1)强依赖于zookeeper,kafka集群中的元数据存在zookeeper,需要额外的维护;

2)当controller宕机后,需要依赖于zookeeper重新选举,新的controller需要从zookeeper上获取元数据,且同步到各个broker,期间kafka是停止对外服务的;

3)当分区变多时,存储在zookeeper的数据变多,导致读写性能降低。zookeeper写操作是由leader完成,并且需要超过半数投票才能通过,并发写性能比较低

综上原因,kafka在2.8版本后由内部的quorum来替代zookeeper和controller,运行期只有一个活跃的controller,集群中的节点通过KRaft进行数据同步和交互,目前该版本只推荐在测试环境中使用。

六、常用配置

6.1 生产者基本参数

bootstrap.servers:kafka集群地址,host1:port1,host2:port2,非必须设置所有的地址,生产者会从给定的broker里查到其他broker信息

key.serializer/value.serializer:序列化方式

buffer.memory:RecordAccumulator:缓存的大小,默认为32MB,如果生产者发送的消息超过这个值,导致kafkaProducer的send方法阻塞或者异常,取决于max.block.ms的配置,默认为60000(60秒)

max.in.flight.requests.per.connection:客户端与Node之间的连接数,默认为5,超过该值,就不能向这个连接发送更多的请求了,除非有缓存请求收到响应,通过Deque的size大小与这个参数的来判断Node负载情况,最小的为leastLoadedNode

metadata.max.age.ms:300000,超过该值没有更新数据会引起元数据的更新操作。

ack:0表示发送消息到broker立即返回,1表示leader副本写入成功后立即返回,-1表示ISR集合中的所有副本写入成功后才能返回。

max.request.size:发送消息的最大值,默认为1MB,与broker中的message.max.bytes有联动

retries/retry.backoff.ms:重试次数,默认为0,以及重试时间间隔,默认为100

compression.type:消息压缩方式,默认为none

6.2 消费者基本参数

partition.assignment.strategy:分区分配策略

group.id:消费组名称

key. deserializer/value. deserializer:反序列化方式

client.id:consumer-webhook-group-LAPTOP-ED8SE5FV-4

fetch.min.bytes:poll()拉取最小数据量,默认1B

fetch.max.bytes:poll()拉取最大数据量,默认50MB

max.poll.records:一次拉取的最大消息数,默认为500条

enable.auto.commit:是否开启自动提交,默认为true

auto.commit.interval.ms:自动提交时间,默认为5s

auto.offset.reset:latest:kafka消费位置,如果有已提交的offset,则从offset开始消费,如果没有提交的offset,earliest表示从头开始消费,latest表示从新产生的位置开始消费

6.3 服务端基本参数

zookeeper.connect:连接的zookeeper集群服务地址,在设置的时候可以添加一个chroot路径,eg:localhost:2181,localhost:2182,localhost:2183/kafka

broker.id:指定kafka集群中broker的唯一标识,默认为-1

log.dir/log.dirs:日志文件存放的根目录

message.max.bytes:指定broker所能接收消息的最大值
auto.create.topics.enable:是否开启自动创建主题功能,默认为true

auto.leader.rebalance.enable:是否开启自动leader再均衡功能,默认为true

delete.topic.enable:是否可以删除主题,默认为true

log.retention.hours:日志保存时间,默认为168(7天)

log.retention.bytes:日志最大保留大小,默认为-1

七、常用命令
kafka-topics.sh --bootstrap-server localhost;9092 --create --topic topic-test --partitions 3 -replication-factor 1

创建主题,还可以通过--list、--describe、--alter、--delete进行删、改、查

kafak-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name topic-config

查看主题的配置,--entity-type指定查看配置的实体类型,包括topics、brokers、clients、users,--entity-name指定配置的名称,对应的主题名、brokerId值、clientId值、指定用户名

kafka-producer-perf-test.sh --topic topic-test --num-records 1000 --record-size 1024 --throughput -1 bootstrap.servers=127.0.0.1:9092

用来测试生产数据,--throughput 限制每秒发送的最大的消息数,设为 -1 表示不限制

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list:查看消费者列表
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group:查看指定消费组详情

参考文献:《深入理解Kafka核心设计与实践原理》

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350379.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号