支持多个生产者,分布式系统中往往有多个微服务向一个kafka集群发送消息;
支持多个消费者从一个Kafka集群中消费消息,可以通过消费者组实现多个消费者同时协作消费消息。
允许消费者非实时读取消息
高吞吐率的实现
1) 顺序读写
2) 零拷贝
1.1 基本术语 1.1.1 Partition分区。topic中的消息被分割为一个或多个partition,其是一个物理概念,对应到系统上就是一个或若干个目录。
partition的数量设置为broker数量的整数倍。
一个partition在kafka上就是一个目录,目录名称为:[topicName]-[partitionNo]
一个partition可以被在不同消费者组的消费者同时消费。
多个partition也可以被同一个消费者消费。
1.1.2 segment段。将partition进一步细分为了若干的segment,每个segment文件的最大大小相等。
segment是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uTxFzLkC-1641959484486)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102181301343.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Aj0RFVNe-1641959484489)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103173910125.png)]
查看segment ?对于segment中的log文件,不能直接通过cat命令查找其内容,而是需要通过kafka自带的一个工具查看。
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/data/kafka-logs/dda-0/00000000000000000000.log --print-data-log
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BEshOsks-1641959484490)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103175728355.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xwg2jebe-1641959484490)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103175933164.png)]
1.1.3 consumer group组内可以有多个消费者,他们共享一个group ID。
Kafka在稳定状态下,一个partition中的消息只能被容易个consumer group中的一个consumer消费,一个组内consumer只会消费某一个或特定几个的partition。
一个消息可以同时被多个consumer group消费。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wz9pdGch-1641959484491)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191201638.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4sllfevV-1641959484492)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191224870.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AnNeszWE-1641959484493)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191240725.png)]
1.1.4 Broker ControllerKafka集群的多个broker中,有一个会被选举为controller,负责管理整个集群中partition和副本replicas的状态。
Kafka集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller 来试图让自己成为controller,其他节点会在该节点消失时收到通知,然后分别再向zk中写/controller节点,只有一个能成功,其他节点只能监听该节点。
1.1.5 ZookeeperZookeeper负责维护和协调broker,负责broker controller的选举。
1.1.6 Coordinator其实是指运行在每一个broker上的group coordinator 进程。用于管理consumer group 中的各个成员,主要用于offset位移管理和Rebalance。一个Coordinator可以管理多个消费者组。
1.1.7 offset偏移量。每条消息都有一个当前Partition下唯一的64字节的offset,它是相对于当前分区第一条消息的偏移量。
1.1.8 offset commitConsumer从partition中取出一批消息写入到buffer对其进行消费,在规定时间内消费完消息后,会自动将其消费消息的offset提交给broker,以让broker记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,其是不会提交offset的。
在进行offset commit时,offset也是以消息的形式写入到了_consumer_offset主题的patition中的。系统会为每个提交的offset生成一个key,该key的hash值与50取模
1.1.9 HW与LEOHW, highWatermark,高水位,表示consumer可以消费的最高patition偏移量。HW保证了Kafka集群正常状态下,partition的follower与leader中消息的一致性。
LEO,log end offset,日志最后消息的偏移量。
对于leader新写入的消息,consumer是不能立刻消费的。leader会等待该消息被所有ISR中的partition follower同步后才会更新HW,此时消息才能被consumer消费。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Org3dLa-1641959484494)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103153848957.png)]
1.1.10 ISRISR,In-Sync Replicas,是指副本同步列表。
1.1.11 Rebalance当下面情况发生时,partition的所有权在消费者间转移,即partition会被重新分配
消费者数量发生变化partition数量发生变化
当发生rebalance时,consumer无法读取消息,即broker集群有一小段时间不可用。所以,rebalance会为消费者组及broker集群带来高可用性和伸缩性,但需要避免不必要的rebalance。
2. 基本操作 2.1 启动并查看是否启动成功:cd /opt/apps/kafka bin/kafka-server-start.sh -daemon config/server.properties netstat -ant2.2 查看topic
[root@ip-172-31-18-183 kafka]# bin/kafka-topics.sh --list --bootstrap-server 172.31.18.183:90922.3 创建topic
[root@ip-172-31-28-122 kafka]# bin/kafka-topics.sh --create --bootstrap-server 172.31.28.122:9092 --replication-factor 2 --partitions 3 --topic payment2.4 查看3个brokers中的分区:
broker-0:
payment-0为主副本
broker-1:
payment-2为主副本
broker-2:
payment-1为主副本
3. 工作原理 3.1 消息的可靠机制-生产者生产者向kafka发送消息时,可以选择需要的可靠性级别。通过acks参数的值进行设置。
(1) 0值
异步发送。生产者向kafka发送消息而不需要kafka反馈成功ack。该方式效率最高,但可靠性最低。其可能会存在消息丢失的情况。
(2) 1值
同步发送,默认值。生产者发送消息给kafka,broker的partition leader在收到消息后马上发送成功ack(无需等待ISR中的follower同步完成),生产者收到后知道消息发送成功,然后会再发送消息。如果一直未收到kafka的ack,则生产者会认为消息发送失败,会重发消息。
无法保证消息发送成功,但可以确认发送失败。
(3) -1值
同步发送。其值等同于all。生产者发送消息给kafka,kafka收到消息后要等到ISR列表中的所有副本都同步消息完成后,才向生产者发送成功ack。如果一直未收到kafka的ack,则认为消息发送失败,会自动重发消息。
可能会出现部分follower消息重复接收(不是重复消费!):当leader接收消息A后,部分follower同步成功时,leader宕机,该设置生产者会认为消息未发送成功,会再次尝试发送消息A,同步成功的follower会重复接收消息A。
解决办法:每条消息携带一个id,自定义去重机制。
3.2 Partition Leader选举范围当leader挂了后broker controller会从ISR中选一个follower成为新的leader。但,若ISR中的所有副本都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置Leader选举的范围。
(1) false
必须等待ISR列表中有副本活过来才进行新的选举。该策略可靠性有保证,但可用性低。
(2) true
在ISR中没有副本的情况下可以选择任何一个没有宕机主机中该topic的partition副本作为新的leader,该策略可用性高,但可靠性没有保证。可能发生消息丢失
如果partition leader接收到了新的消息, ISR中其它Follower正在同步过程中,还未同步完毕时leader挂了。此时就需要选举出新的leader。若没有HW截断机制,将会导致partition中leader与follower数据的不一致。
3.4 Partition Leader选举范围当leader挂了后broker controller会从ISR中选一个follower成为新的leader。但,若ISR中的所有副本都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置Leader选举的范围。
(1) false
必须等待ISR列表中有副本活过来才进行新的选举。该策略可靠性有保证,但可用性低。
(2) true
在ISR中没有副本的情况下可以选择任何一个没有宕机主机中该topic的partition副本作为新的leader,该策略可用性高,但可靠性没有保证。OSR中的broker消息不全,可能会发生消息丢失。
当消费者消费能力低,且发生自动提交超时时,consumer会向broker提交一个异常。此时,consumer会再次尝试进行消息消费,从之前的offset开始进行poll消息,产生了消息重复消费
解决方案:手动提交
不同的consumer重复消费当消费了消息但还未提交offset时宕机,则这些已经被消费的消息会被其他consumer重复消费。
若将自动提交的时间改长,虽然可以降低一个consumer的重复消费的可能,但增加了不同consumer重复消费的可能。
4. 查看zookeeper链接Zookeeper
zkCli.sh5. producer 5.1 参数 5.1.2 bootstrap.servers
指定broker的地址清单。该清单不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议提供两个broker,避免单点故障。
5.1.3 key.serializer必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。
5.1.4 value.serializer 5.1.5 acksacks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
camel的默认设置为 acks = 1
camel设置:32M
5.1.7 compression.type发送前是否被压缩
camel设置:none
默认:
5.1.9 batch.size: 16k当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。
5.1.10 max.block.ms: 60s该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常
5.1.11 max.in.flight.requests.per.connection该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
5.2 分区 5.2.1 概念同一个partition中的消息顺序消费
不同partition中的消息消费顺序不保证
3个broker, 6个partition, 3个副本,在kafka cluster上分布如下图所示:
当key为空时,消息随机发送到各个分区(version 2.7.2)
当key不为空时,key的散列值对partion的个数取模
PartitionInfo对象
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;//分区的所有副本
private final Node[] inSyncReplicas;//分区中处于ISR的副本
private final Node[] offlineReplicas;
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {//消息没有key,则随机分布
return stickyPartitionCache.partition(topic, cluster);
}
// 将key序列化后生成一个散列码,用这个散列码对分区数取余
// hash the keyBytes to choose a partition
// Utils.murmur2(keyBytes): Generates 32 bit murmur2 hash from byte array
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public class StickyPartitionCache {
private final ConcurrentMap indexCache;//为任意topic缓存当前sticky分区
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {//在可用分区列表中随机分布
while (newPart == null || newPart.equals(oldPart)) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
3个broker,6个分区,3个副本;下图可见共有6个对应的分区信息对象。
消息经过分区器后缓存到记录收集器RecordAccumulator中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
作用:缓存消息,减少网络传输
相关参数 - buffer.memory:缓存大小,默认为32MB
每个分区都有一个Deque
追加消息时,首先要获取分区所属队列Deque,若没有,则新建。
然后获取队列Deque中的最后一个批记录,若队列中还没有批记录或批记录已满,则新建并加入队列尾部。
下面省略了其他代码,只保留核心代码部分:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
//....省略
try {
// check if we have an in-progress batch
//获取分区的双端队列:寻找与消息分区所对应的双端队列(如果没有则新建)
Deque dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
//没有可被填充的record batch, 创建一个新的record batch ,最后一个入参 abortForNewBatch为true,说明后续会将消息附加到这个新的record batch上
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
//....省略
}
private Deque getOrCreateDeque(TopicPartition tp) {
Deque d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque deque, long nowMs) {
ProducerBatch last = deque.peekLast();//获取分区的最后一个ProducerBatch
if (last != null) {
//将当前message尝试附加到获取到的最后一个ProducerBatch
FutureRecordmetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
//如果Deque>已满
//则需要释放资源,压缩buffers, 关闭producer batch
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
5.4. 客户端消息发送线程
5.4.1 从记录收集器获取数据
获取已准备好发送分区的节点列表;
若分区的leader未知,则更新元数据;
移除网络层还没有准备好的节点;
从记录收集器获取按照节点node将消息重新分组的Map
为每一个客户端节点node创建一个客户端请求
每个ProducerBatch都有唯一的 topicPartition;ProducerBatch的record是MemoryRecords;
将每个node中的所有的producerBatch按照TopicPartition放入Map
最后封装为客户端请求ClientRequest,回调函数作为客户端请求的一个变量,客户端请求完成后,会调用回调函数;
将给定的请求放入inFlightRequests列表等待发送。 请求只能发送到就绪节点。 InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
把请求交给客户端网络对象NetworkClient执行真正的网络读写请求,将请求真正发送出去;
待续



