目录
一、kafka基本概述
二、生产者
三、消费者
3.1 常用API
3.2 再均衡策略
3.3 分区分配策略
四、服务端
4.1 控制器
4.2 日志存储
4.3 日志清理
五、zookeeper
六、常用配置
6.1 生产者基本参数
6.2 消费者基本参数
6.3 服务端基本参数
七、常用命令
一、kafka基本概述
kafka是Apache下的一款采用Scala+java语言开发的多分区、多副本的分布式消息系统,具有高吞吐量、可持久化、可水平扩展、支持流数据处理等特性。在2.8.0之前依赖于zookeeper协调器。
kafka的基础架构:
kafka架构是由多个不同角色组成,相互协作:
1)Producer:生产者,向kafka发布消息的角色;
2)Consumer:消费者,从kafka中拉取消息的角色;
3)Consumer Group:消费者组,跟消费者是一对多的关系,一个消费组包含多个消费者,某一个消费者一定属于某个消费组,同一个消费组的不同消费者不能消费同一个分区中的消息;
3)Broker:服务节点,一个kafka实例对应一个broker,集群是由多个broker组成;
4)Topic:指一类消息的总称,逻辑概念,生产者、消费者必须要指定对应的topic;
5)Partition:分区,一个topic可以分为多个partition,一个partition里面的消息是有顺序的,不同partition中的消息无序;
6)Replica:副本,为了保证数据的高可用,提高kafka容灾能力,kafka通过Replica保存多份数据,一个分区可以包括多个副本,副本包括leader和follower,leader负责读写,follower只负责同步数据;kafka要求副本必须要不同的broker中;
7)zookeeper:管理kafka中的元数据,比如broker、topic、partition,还有包括controller控制器的选举等。
kafka中的元数据在zookeeper节点的分布情况:
AR(Assigned Replicas):分区中所有副本的统称;
ISR(In-Sync-Replicas):所有与leader数据保持一定同步的副本(包括leader副本)统称;
OSR(Out-of-Sync-Replicas):与leader数据同步滞后的副本统称;
AR=ISR集合+OSR集合;正常情况下,所有副本都能和leader副本数据保持同步,即AR=ISR。
leader副本负责维护和跟踪ISR集合中follower副本滞后状态,如果follower副本滞后太多或者失效,leader副本会将该follower副本从ISR中剔除,加入到OSR集合中。如果OSR集中的follower追上leader副本,leader副本会负责将该follower从OSR中转移到ISR集合。如果leader副本宕机或者失效,controller会从ISR集合中选择一个follower作为新的leader。
ISR与HW和LEO有密切关系。HW(High Watermark)俗称高水位,标识一个特定的消息偏移量(offset),消费者拉取消息只能拉取这个offset之前的数据。
LEO(Log End Offset),标识日志文件下一条将要写的消息偏移量。如下图所示:
其中ISR副本集合中最小的LEO作为分区的HW,即消费者只能消费该分区HW之前的消息。
二、生产者
即消息的生产者,producer几个重要参数
客户端可以选择不同的构造函数 :
其中topic、value是必传的,partition、key、timestamp可选参数,producer可以通过headers添加一些必要参数,比如用来做幂等操作。
producer发送的整个过程:
拦截器:默认是没有的,可以在发送消息之前做一些准备工作,比如过滤、修改数据等。用户可以通过实现ProducerInterceptor自定义拦截器。
序列化:将消息对象转化成字节数组。kafka自带的序列化有StringSerializer,ByteArray、ByteBuffer、Double、Integer、Long类型的序列化,用户也可以通过实现Serializer接口自定义,要注意的是必须要和消费者反序列保持一致,不然消费的时候会发生错误。
分区器:如果producer在发送消息的时候指定partition,就不需要分区器,如果只是指定了key,默认分区器(DefaultPartioner)首先对key进行哈希(采用MurmurHash2算法),然后对分区数进行取模,所以相同key的消息会写入同一个分区中。如果不指定key,消息会以轮询的方式写入各个分区。
由于2.4之前的版本,如果未指定key,会根据轮询方式,将消息发送给各个分区,在生产者侧,真正发送到kafka集群的是sender线程,发送条件是达到batch.size大小或者是积累的时间超过linger.ms,才会发送,可能导致消费积压发送延迟。
2.4版本之后,引入了StickyPartitioning Strategy(粘性分区策略),解决了由于将消费分成小batch导致发送延迟的问题,粘性分区策略,会将消息统一发送给某个分区,从而保证消费能够快速发送到broke中。
三、消费者
每一个消费者必属于某个消费组,在消费的时候必须要指定消费组group.id、消费主题,一个消费者可以同时消费多个主题。
3.1 常用API
通过subscribe()方法订阅主题
public void subscribe(Collectiontopics, ConsumerRebalanceListener listener); public void subscribe(Collection topics); public void subscribe(Pattern pattern, ConsumerRebalanceListener listener); public void subscribe(Pattern pattern); consumer.subscribe(Arrays.asList("topic1", "topic2")); consumer.subscribe(Pattern.compile("topic-.*"));
除了subscribe外,还可以通过assign()进行订阅
public void assign(Collectionpartitions);
TopicPartition必须要指定topic和partition;可以通过partionsFor获取该topic下的partition列表;
public ListpartionsFor(String topic);
可以通过record()方法获取消息集中指定分区的消息
ConsumerRecordspoll = kafkaConsumer.poll(Duration.ofSeconds(5)); Iterable > records = poll.records(TopicPartition partition);
seek()方法可以指定分区从哪个位置开始消费,只能重置消费消费者分配到的分区的消费位置;
可以通过assignment()获取消费者分配到的分区信息
public void seek(TopicPartition partition, long offset); Setassignment = consumer.assignment(); for (TopicPartition tp: assignment) { consumer.seek(tp, 10); }
public MapbeginningOffsets(Collection partitions); //获取开始的分区信息 public Map endOffsets(Collection partitions); //获取末尾的分区信息 Map offsets = consumer.endOffsets(assignment); for (TopicPartition tp : assignment){ consumer.seek(tp, offsets.get(tp)); } public void seekToBeginning(Collection partitions); public void seekToEnd(Collection partitions);
比如想要消费昨天8点之后的消息,offsetsForTimes()方法可以实现;
public MapoffsetsForTimes(Map , timestampsToSearch) public Map offsetsForTimes(Map , timestampsToSearch, Duration timeout)
返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段;
MaptimestampToSearch = new HashMap<>(); for (TopicPartition tp : assignment) { timestampToSearch.put(tp, System.currentTimeMillis()一1*24*3600*1000); } Map offsets = consumer.offsetsForTimes(timestampToSearch); for (TopicPartition tp : assignment){ OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); if (offsetAndTimestamp ! = null) { consumer.seek(tp, offsetAndTimestamp. offset()); } }
消费流程:通过poll()方法拉取消息、对消息进行反序列化、执行消费者拦截器;
消费者拦截器:实现ConsumerInterceptor接口。比如可以过滤已经过期的消息。
poll()方法中涉及到消息位移、再均衡的逻辑、消费者协调器、组协调器、消费者选举、分区分配的分发、心跳等多个内容。
消费者在消费完数据时需要将消息的偏移量保存起来,防止消费者重启后重复消费;消费位移是存储再内部主题__consumer_offsets中。
3.2 再均衡策略
在消费过程中,当发送以下中的一种情况,会发生reblance:
1)有新的消费者加入或者是有消费者发送宕机退出消费组
2)消费组中所订阅的主题数发送变化
3)消费组中所订阅的分区数发送变化
比如某应用为了提高消费能力,新增消费者节点:
1)Find Coordinator:新增的消费者需要知道该消费组中Coordinator所在的broker,创建与该节点的网络连接。首先该消费者向集群中的某个broker发送FindCoordinatorRequest请求,这里的某个broker是指集群中负载最小的节点。broker收到请求后,根据消费者传的groupId计算__consumer_offsets的分区编号,计算方式为:
Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCount(主题__consumer_offsets分区数)
找到该分区的leader副本所在的broker节点即是该组对应的GroupCoordinator。
2)Join Group:找到GroupCoordinator节点后,消费者向该节点发送JoinGroupRequest请求,并且阻塞等待服务端响应;GroupCoordinator收到请求后,首先做合法性请求校验,比如group_id是否为空;同时服务端会在其他消费者下一个心跳时通知其他消费者进行reblance,其他消费者收到后revoke各自的partition,也向GroupCoordinator发送JoinGroupRequest请求;
GroupCoordinator需要为消费组中选择一个leader,选择分为两种情况;如果消费组中没有leader,那么第一个加入消费组的消费者为leader;如果某一个时刻leader退出了消费组,会随机选举一个新的leader;GroupCoordinator是通过hashMap存储消费者信息的,选择第一个键值对的消费者作为leader;
选举分区分配策略:每个消费者会将自己支持的分配策略发送给GroupCoordinator,选举算法为:
1)GroupCoordinator收集每个消费者支持的分配策略;
2)从每个消费者找出第一个自身支持的策略,为其投上一票;
3)计算各个策略的投票数,找出票数最多的为消费组的分配策略;
GroupCoordinator发送JoinGroupResponse响应给各个消费者,其中leader接收到所有消费者信息和分配策略,
3)Sync Group:leader根据分配策略进行分区分配;各个消费者向GroupCoordinator发送SyncGroupRequest请求进行同步分配方案,此时leader会将分配的方案发送给GroupCoordinator,GroupCoordinator接收到方案后会将消费组的元数据存入__consumer_offsets主题中,然后发送给各个消费者。之后新的消费者开启心跳任务,定期向服务端GroupCoordinator发送HeartbeatRequest请求。
3.3 分区分配策略
1)RangeAssignor分配策略
该分配策略是topic粒度的,将消费者的总数和该topic下的所有分区数整除运算,然后将分区进行平均分配。
例子1:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有4个分区,所有的分区标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3,最终的分配结构为:
消费者C0:t0p0、t0p1、t1p0、t1p1 消费者C1:t0p2、t0p3、t1p2、t1p3
例子2:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有3个分区,所有的分区标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,最终的分配结构为:
消费者C0:t0p0、t0p1、t1p0、t1p1 消费者C1:t0p2、t1p2
2)RoundRobinAssignor分配策略
将消费者中所有消费者以及所订阅的所有主题下的分区进行排序,然后通过轮询方式将分区分配给消费者。
例子1:消费组中有两个消费者C0,C1,同时订阅了t0,t1,每个topic有3个分区,所有的分区标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,最终的分配结构为:
消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2
例子2:三个消费者C0,C1,C2,订阅三个主题,三个主题对应的分区数分别为1、2、3,C0订阅t0,C1订阅t0,t1,C2订阅t0,t1,t2,所有的分区标识为:t0p0、t1p0、t1p1、t2p0、t2p1、t2p2,最终的分配结构是:
消费者C0:t0p0 消费者C1:t1p0 消费者C2:t1p1、t2p0、t2p1、t2p2
3)0.11版本后,新增了StickyAssignor分配策略
a)分区的分配尽可能均匀;
b)分区的分配尽可能与上次分配的保持相同。
例子1:三个消费者C0,C1,C2,订阅三个主题,三个主题对应的分区数分别为1、2、3,C0订阅t0,C1订阅t0,t1,C2订阅t0,t1,t2,所有的分区标识为:t0p0、t1p0、t1p1、t2p0、t2p1、t2p2,最终的分配结构是:
消费者C0:t0p0 消费者C1:t1p0、t1p1 消费者C2:t2p0、t2p1、t2p2
例子2:有三个消费者,同时订阅了四个topic,每个topic有2个分区,所有分区标识为:t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1
刚开始,RoundRobinAssignor和StickyAssignor分配结果是:
消费者C0:t0p0、t1p1、t3p0 消费者C1:t0p1、t2p0、t3p1 消费者C2:t1p0、t2p1
如果此时C0脱离了消费组后,RoundRobinAssignor分配结果是:
消费者C1:t0p0、t1p0、t2p0、t3p0 消费者C2:t0p1、t1p1、t2p1、t3p1
StickyAssignor分配结果是:
消费者C1:t0p1、t2p0、t3p1、t1p1 消费者C2:t1p0、t2p1、t0p0、t3p0
4)2.4版本后,新增了CooperativeStickyAssignor分配策略,为了减少stop-the-world时间。
前几种策略中,消费者在收到GroupCoordinator的reblance后,都是revoke之前的partition,由GroupCoordinator重新分配;CooperativeStickyAssignor策略是分成多次reblance,不会revoke各自的partition,将各自的partition策略发送给GroupCoordinator,GroupCoordinator会尽量保持之前的策略,通知到消费者,消费者继续消费分配的partition,未分配的partition继续reblance;
5)自定义分配策略:实现PartitionAssignor接口
2.4版本后,引入了静态成员,消费者在可以通过配置group.instance.id,当发送reblance时,在session.timeout.ms内,消费者的分配策略不会发送变化。
四、服务端
4.1 控制器
Controller:选择kafka集群中某一个broker作为集群的Controller,主要负责集群中分区和副本的状态,比如当某个分区的leader副本出现故障,控制器会从ISR集合中选择一个副本作为该飞去的leader副本,或者当检测到分区的ISR集合发送变化时,控制器会通知所有的broker更新其元数据信息,当新增分区时,也是由控制器进行负责分区的分配。
控制器选举:是通过zookeeper进行选择,在zk节点上创建一个临时节点/controller,如果创建成功,该broker,作为控制器,如果创建失败,说明已有控制器,其余未选中的broker会watch该节点。
分区leader的选举:按照AR集合中的顺序选举第一个存活的副本(优先副本策略),且这个副本在ISR集中。
优先副本策略的目的是为了让leader副本在集群中分别尽可能均匀:
Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 将broker2节点重启后: Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: topic-partitions Partition: 1 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2 Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
可自动平衡或者是手动平衡,auto.leader.rebalance.enable默认是true,即自动开启平衡策略,控制器会开启一个定时任务,定时轮询所有broker,如果出现分区不平衡会触发平衡策略,平衡策略会导致客户端一定时间的阻塞。建议改成手动触发。
分区重分配:当某个broker准备下线,该节点的副本需要手动迁移到集群中其他的broker节点上,或者新增broker,让现有的分区分配到新的节点上,需要分区重分配。
分区重分配原理:控制器为每个分区新加副本因子,新的副本因子会从该分区的leader副本复制所有的数据,复制完成后,控制器将旧副本从副本清单中移除。重新分配后有可能会出现leader分配不均衡,需要执行一次优先副本的选举。
分区数不能无线多的原因:
1)限制于服务器文件描述符数量的限制
2)因为分区的元数据都存储与zk上面,如果分区太多,对应的副本信息太多,对zk的读写造成一定的压力
3)由于每个broker和客户端都会缓存分区的信息,分区数太多,占用内存也会越大
4)分区中只有leader副本对外提高读写,当broker发生故障,该节点上的分区将不可用,需要控制器选举新的leader,如果分区太多,导致大量的分区进行leader角色切换,导致短时间内不可用。
4.2 日志存储
日志被分为多个segment,以topic-partition命名,每个segment包括.log日志文件、.index稀疏索引文件、.timeindex时间戳索引文件,当日志达到一定的要求时,会产生新的segment:
1)当日志段文件的大小大于log.segment.bytes大小,默认1GB
2)日志分段中最大的时间戳与当前系统的时间戳大于log.roll.ms(优先级高)/hours:7天
3)索引文件达到log.index.size.max.bytes:10MB
4)追加的消息偏移量与日志段中最大的偏移量差值大于Integer.MAX_VALUE
稀疏索引文件的数据格式:
relativeOffset:相对偏移量,即对于baseOffset(当前文件的文件名)
potition:消息在日志文件中的物理位置
kafka使用ConcurrentSkipListMap保存各个日志分段,以baseOffset作为key。
比如ConcurrentSkipListMap保存的日志分段:00000000000000000000、00000000000100000000、00000000000200000000、00000000000300000000
现在想要查找偏移量为00000000000100010000的消息:首先在ConcurrentSkipListMap中找到00000000000100000000偏移量对应的稀疏索引文件,在索引文件中,通过二分法找到不大于30的数据18,然后根据对应的物理偏移量565在log文件中向下找到偏移量为30的消息。
时间戳索引数据格式:
timestamp:当前日志分段中最大的时间戳
relativeOffset:时间戳对应的相对偏移量
查找步骤:
1)将需要查找的targetTimeStamp和每个日志分段中最大的时间戳largestTimeStamp逐一对比,找到不小于targetTimeStamp的日志分段。日志分段中largestTimeStamp的计算是先查询该日志分段对应的时间戳索引文件,找到最后一条索引项,若该值大于0,取其值,否则取日志分段的最近修改时间。
2)找到日志分段文件后,在时间戳索引文件通过二分法查找到不大于targetTimeStamp的最大索引项,找打对应的相对偏移量,在索引文件中通过二分法找到不大于28的最大索引项,在通过日志分段文件找到对应的消息。
4.3 日志清理
kafka提高了两种日志清理策略。log.cleanup.policy=delete,compact
1)日志删除
a)基于时间保留策略:log.retention.hours:168
通过日志分段文件判断是否超过阈值,当前的时间与分段文件中largestTimeStamp(取对应的时间戳索引文件中最后一条索引项,如果大于0取该值,反之取分段文件的lastModifiedTime)比较。删除时,首先从跳跃表中移除待删除的日志分段,然后将对应的日志分段文件和索引文件后缀添加.delete。由delete-file命名的延迟任务删除以.delete为后缀的文件,file.delete.delay.ms参数:60000(1分钟);
b)基于日志大小保留策略:log.retention.bytes Log中所有日志文件的总大小:-1(无穷大)
首先计算日志文件的总大小和retensize的差值diff,即计算需要删除的日志总大小,然后从日志第一个日志分段开始查找可删除的分段文件的集合(剩余大小必须超过一个分段文件大小才能删除),之后的操作和基于时间一样。
c)基于日志起始偏移量的保留策略
基于日志起始偏移量logStartOffset策略判断依据时日志分段下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是,则可以删除该日志分段
eg:logstartOffset为35,遍历每一个日志段,日志段1下一个日志段的起始偏移量为11,小于35,加入删除集合,日志段2下一个日志段的起始偏移量为23小于35,加入删除集合,日志段3下一个日志段的起始偏移量为30小于35,加入删除集合,日志段4下一个日志段的起始偏移量为40大于35,所以,需要删除的日志段为1、2、3;
2)日志压缩
对于相同的key,不同的value值,只保留最后一个版本。
每个日志清理线程会使用SkimpyOffsetMap的对象存储key和offset的映射关系哈希表。日志清理会遍历两次日志文件,第一次遍历把所有的key的哈希值和最后出现offset保存在Map中,
第二次遍历会检查每个消息是否符合保留条件,即通过偏移量判断,不符合的被清理。
对日志分段文件进行分组:然后按照日志分段文件的顺序进行遍历,每组中的日志空间大小不超过log.segment.bytes,同一组日志清理后会将保留的消息复制到以.clean的临时文件中,文件名以第一个日志分段的文件名命名,
比如00000000000000000000.log.clean。Log Compaction过后将.clean文件修改为.swap,然后删除原来的日志文件,最后把.swap后缀去掉。
五、zookeeper
zookeeper在kafka集群中起着至关重要的角色,在2.8版本之前完全依赖于zookeeper,其中存在的问题有:
1)强依赖于zookeeper,kafka集群中的元数据存在zookeeper,需要额外的维护;
2)当controller宕机后,需要依赖于zookeeper重新选举,新的controller需要从zookeeper上获取元数据,且同步到各个broker,期间kafka是停止对外服务的;
3)当分区变多时,存储在zookeeper的数据变多,导致读写性能降低。zookeeper写操作是由leader完成,并且需要超过半数投票才能通过,并发写性能比较低
综上原因,kafka在2.8版本后由内部的quorum来替代zookeeper和controller,运行期只有一个活跃的controller,集群中的节点通过KRaft进行数据同步和交互,目前该版本只推荐在测试环境中使用。
六、常用配置
6.1 生产者基本参数
bootstrap.servers:kafka集群地址,host1:port1,host2:port2,非必须设置所有的地址,生产者会从给定的broker里查到其他broker信息
key.serializer/value.serializer:序列化方式
buffer.memory:RecordAccumulator:缓存的大小,默认为32MB,如果生产者发送的消息超过这个值,导致kafkaProducer的send方法阻塞或者异常,取决于max.block.ms的配置,默认为60000(60秒)
max.in.flight.requests.per.connection:客户端与Node之间的连接数,默认为5,超过该值,就不能向这个连接发送更多的请求了,除非有缓存请求收到响应,通过Deque的size大小与这个参数的来判断Node负载情况,最小的为leastLoadedNode
metadata.max.age.ms:300000,超过该值没有更新数据会引起元数据的更新操作。
ack:0表示发送消息到broker立即返回,1表示leader副本写入成功后立即返回,-1表示ISR集合中的所有副本写入成功后才能返回。
max.request.size:发送消息的最大值,默认为1MB,与broker中的message.max.bytes有联动
retries/retry.backoff.ms:重试次数,默认为0,以及重试时间间隔,默认为100
compression.type:消息压缩方式,默认为none
6.2 消费者基本参数
partition.assignment.strategy:分区分配策略
group.id:消费组名称
key. deserializer/value. deserializer:反序列化方式
client.id:consumer-webhook-group-LAPTOP-ED8SE5FV-4
fetch.min.bytes:poll()拉取最小数据量,默认1B
fetch.max.bytes:poll()拉取最大数据量,默认50MB
max.poll.records:一次拉取的最大消息数,默认为500条
enable.auto.commit:是否开启自动提交,默认为true
auto.commit.interval.ms:自动提交时间,默认为5s
auto.offset.reset:latest:kafka消费位置,如果有已提交的offset,则从offset开始消费,如果没有提交的offset,earliest表示从头开始消费,latest表示从新产生的位置开始消费
6.3 服务端基本参数
zookeeper.connect:连接的zookeeper集群服务地址,在设置的时候可以添加一个chroot路径,eg:localhost:2181,localhost:2182,localhost:2183/kafka
broker.id:指定kafka集群中broker的唯一标识,默认为-1
log.dir/log.dirs:日志文件存放的根目录
message.max.bytes:指定broker所能接收消息的最大值
auto.create.topics.enable:是否开启自动创建主题功能,默认为true
auto.leader.rebalance.enable:是否开启自动leader再均衡功能,默认为true
delete.topic.enable:是否可以删除主题,默认为true
log.retention.hours:日志保存时间,默认为168(7天)
log.retention.bytes:日志最大保留大小,默认为-1
七、常用命令
kafka-topics.sh --bootstrap-server localhost;9092 --create --topic topic-test --partitions 3 -replication-factor 1
创建主题,还可以通过--list、--describe、--alter、--delete进行删、改、查
kafak-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name topic-config
查看主题的配置,--entity-type指定查看配置的实体类型,包括topics、brokers、clients、users,--entity-name指定配置的名称,对应的主题名、brokerId值、clientId值、指定用户名
kafka-producer-perf-test.sh --topic topic-test --num-records 1000 --record-size 1024 --throughput -1 bootstrap.servers=127.0.0.1:9092
用来测试生产数据,--throughput 限制每秒发送的最大的消息数,设为 -1 表示不限制
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list:查看消费者列表 kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group:查看指定消费组详情
参考文献:《深入理解Kafka核心设计与实践原理》



