栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka 之 - Producer

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka 之 - Producer

1. 前言 why Kafka

支持多个生产者,分布式系统中往往有多个微服务向一个kafka集群发送消息;
支持多个消费者从一个Kafka集群中消费消息,可以通过消费者组实现多个消费者同时协作消费消息。
允许消费者非实时读取消息

高吞吐率的实现

1) 顺序读写

2) 零拷贝

1.1 基本术语 1.1.1 Partition

分区。topic中的消息被分割为一个或多个partition,其是一个物理概念,对应到系统上就是一个或若干个目录。

partition的数量设置为broker数量的整数倍。

一个partition在kafka上就是一个目录,目录名称为:[topicName]-[partitionNo]

一个partition可以被在不同消费者组的消费者同时消费。

多个partition也可以被同一个消费者消费。

1.1.2 segment

段。将partition进一步细分为了若干的segment,每个segment文件的最大大小相等。

segment是一个逻辑概念,其由两类物理文件组成,分别为“.index”文件和“.log”文件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uTxFzLkC-1641959484486)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102181301343.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Aj0RFVNe-1641959484489)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103173910125.png)]

查看segment ?

对于segment中的log文件,不能直接通过cat命令查找其内容,而是需要通过kafka自带的一个工具查看。

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/data/kafka-logs/dda-0/00000000000000000000.log --print-data-log

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BEshOsks-1641959484490)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103175728355.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xwg2jebe-1641959484490)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103175933164.png)]

1.1.3 consumer group

组内可以有多个消费者,他们共享一个group ID。

Kafka在稳定状态下,一个partition中的消息只能被容易个consumer group中的一个consumer消费,一个组内consumer只会消费某一个或特定几个的partition。

一个消息可以同时被多个consumer group消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wz9pdGch-1641959484491)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191201638.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4sllfevV-1641959484492)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191224870.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AnNeszWE-1641959484493)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220102191240725.png)]

1.1.4 Broker Controller

Kafka集群的多个broker中,有一个会被选举为controller,负责管理整个集群中partition和副本replicas的状态。

Kafka集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller 来试图让自己成为controller,其他节点会在该节点消失时收到通知,然后分别再向zk中写/controller节点,只有一个能成功,其他节点只能监听该节点。

1.1.5 Zookeeper

Zookeeper负责维护和协调broker,负责broker controller的选举。

1.1.6 Coordinator

其实是指运行在每一个broker上的group coordinator 进程。用于管理consumer group 中的各个成员,主要用于offset位移管理和Rebalance。一个Coordinator可以管理多个消费者组。

1.1.7 offset

偏移量。每条消息都有一个当前Partition下唯一的64字节的offset,它是相对于当前分区第一条消息的偏移量。

1.1.8 offset commit

Consumer从partition中取出一批消息写入到buffer对其进行消费,在规定时间内消费完消息后,会自动将其消费消息的offset提交给broker,以让broker记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,其是不会提交offset的。

在进行offset commit时,offset也是以消息的形式写入到了_consumer_offset主题的patition中的。系统会为每个提交的offset生成一个key,该key的hash值与50取模

1.1.9 HW与LEO

HW, highWatermark,高水位,表示consumer可以消费的最高patition偏移量。HW保证了Kafka集群正常状态下,partition的follower与leader中消息的一致性。

LEO,log end offset,日志最后消息的偏移量。

对于leader新写入的消息,consumer是不能立刻消费的。leader会等待该消息被所有ISR中的partition follower同步后才会更新HW,此时消息才能被consumer消费。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Org3dLa-1641959484494)(C:UsersEllen LiuAppDataRoamingTyporatypora-user-imagesimage-20220103153848957.png)]

1.1.10 ISR

ISR,In-Sync Replicas,是指副本同步列表。

1.1.11 Rebalance

当下面情况发生时,partition的所有权在消费者间转移,即partition会被重新分配

消费者数量发生变化partition数量发生变化

当发生rebalance时,consumer无法读取消息,即broker集群有一小段时间不可用。所以,rebalance会为消费者组及broker集群带来高可用性和伸缩性,但需要避免不必要的rebalance。

2. 基本操作 2.1 启动并查看是否启动成功:
cd /opt/apps/kafka
bin/kafka-server-start.sh -daemon config/server.properties
netstat -ant

2.2 查看topic
[root@ip-172-31-18-183 kafka]# bin/kafka-topics.sh --list --bootstrap-server 172.31.18.183:9092

2.3 创建topic
[root@ip-172-31-28-122 kafka]# bin/kafka-topics.sh --create --bootstrap-server 172.31.28.122:9092 --replication-factor 2 --partitions 3 --topic payment

2.4 查看3个brokers中的分区:

broker-0:

payment-0为主副本


broker-1:

payment-2为主副本

broker-2:

payment-1为主副本

3. 工作原理 3.1 消息的可靠机制-生产者

生产者向kafka发送消息时,可以选择需要的可靠性级别。通过acks参数的值进行设置。
(1) 0值
异步发送。生产者向kafka发送消息而不需要kafka反馈成功ack。该方式效率最高,但可靠性最低。其可能会存在消息丢失的情况。
(2) 1值
同步发送,默认值。生产者发送消息给kafka,broker的partition leader在收到消息后马上发送成功ack(无需等待ISR中的follower同步完成),生产者收到后知道消息发送成功,然后会再发送消息。如果一直未收到kafka的ack,则生产者会认为消息发送失败,会重发消息。

无法保证消息发送成功,但可以确认发送失败。

(3) -1值
同步发送。其值等同于all。生产者发送消息给kafka,kafka收到消息后要等到ISR列表中的所有副本都同步消息完成后,才向生产者发送成功ack。如果一直未收到kafka的ack,则认为消息发送失败,会自动重发消息。

可能会出现部分follower消息重复接收(不是重复消费!):当leader接收消息A后,部分follower同步成功时,leader宕机,该设置生产者会认为消息未发送成功,会再次尝试发送消息A,同步成功的follower会重复接收消息A。

解决办法:每条消息携带一个id,自定义去重机制。

3.2 Partition Leader选举范围

当leader挂了后broker controller会从ISR中选一个follower成为新的leader。但,若ISR中的所有副本都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置Leader选举的范围。
(1) false
必须等待ISR列表中有副本活过来才进行新的选举。该策略可靠性有保证,但可用性低。
(2) true
在ISR中没有副本的情况下可以选择任何一个没有宕机主机中该topic的partition副本作为新的leader,该策略可用性高,但可靠性没有保证。可能发生消息丢失

3.3 HW截断机制

如果partition leader接收到了新的消息, ISR中其它Follower正在同步过程中,还未同步完毕时leader挂了。此时就需要选举出新的leader。若没有HW截断机制,将会导致partition中leader与follower数据的不一致。

3.4 Partition Leader选举范围

当leader挂了后broker controller会从ISR中选一个follower成为新的leader。但,若ISR中的所有副本都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置Leader选举的范围。
(1) false
必须等待ISR列表中有副本活过来才进行新的选举。该策略可靠性有保证,但可用性低。
(2) true
在ISR中没有副本的情况下可以选择任何一个没有宕机主机中该topic的partition副本作为新的leader,该策略可用性高,但可靠性没有保证。OSR中的broker消息不全,可能会发生消息丢失。

3.5 重复消费问题解决 同一个consumer重复消费

当消费者消费能力低,且发生自动提交超时时,consumer会向broker提交一个异常。此时,consumer会再次尝试进行消息消费,从之前的offset开始进行poll消息,产生了消息重复消费

解决方案:手动提交

不同的consumer重复消费

当消费了消息但还未提交offset时宕机,则这些已经被消费的消息会被其他consumer重复消费。

若将自动提交的时间改长,虽然可以降低一个consumer的重复消费的可能,但增加了不同consumer重复消费的可能。

4. 查看zookeeper

链接Zookeeper

zkCli.sh

5. producer 5.1 参数 5.1.2 bootstrap.servers

指定broker的地址清单。该清单不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议提供两个broker,避免单点故障。

5.1.3 key.serializer

必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。

5.1.4 value.serializer 5.1.5 acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
camel的默认设置为 acks = 1

5.1.6 buffer.memory

camel设置:32M

5.1.7 compression.type

发送前是否被压缩
camel设置:none

5.1.8 retries: 0

默认:

5.1.9 batch.size: 16k

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。

5.1.10 max.block.ms: 60s

该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常

5.1.11 max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

5.2 分区 5.2.1 概念

同一个partition中的消息顺序消费
不同partition中的消息消费顺序不保证
3个broker, 6个partition, 3个副本,在kafka cluster上分布如下图所示:

5.2.2 分区器 5.2.2.1 producer如何决定发送消息的分区?

当key为空时,消息随机发送到各个分区(version 2.7.2)
当key不为空时,key的散列值对partion的个数取模

5.2.2.2 源码

PartitionInfo对象

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;//分区的所有副本
    private final Node[] inSyncReplicas;//分区中处于ISR的副本
    private final Node[] offlineReplicas;
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    if (keyBytes == null) {//消息没有key,则随机分布
        return stickyPartitionCache.partition(topic, cluster);
    }
    // 将key序列化后生成一个散列码,用这个散列码对分区数取余
    // hash the keyBytes to choose a partition
    // Utils.murmur2(keyBytes): Generates 32 bit murmur2 hash from byte array
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}


public class StickyPartitionCache {
    private final ConcurrentMap indexCache;//为任意topic缓存当前sticky分区
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }
	public int partition(String topic, Cluster cluster) {
	    Integer part = indexCache.get(topic);
	    if (part == null) {
	        return nextPartition(topic, cluster, -1);
	    }
	    return part;
	}
	
	public int nextPartition(String topic, Cluster cluster, int prevPartition) {
	    List partitions = cluster.partitionsForTopic(topic);
	    Integer oldPart = indexCache.get(topic);
	    Integer newPart = oldPart;
	    // Check that the current sticky partition for the topic is either not set or that the partition that 
	    // triggered the new batch matches the sticky partition that needs to be changed.
	    if (oldPart == null || oldPart == prevPartition) {
	        List availablePartitions = cluster.availablePartitionsForTopic(topic);
	        if (availablePartitions.size() < 1) {
	            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
	            newPart = random % partitions.size();
	        } else if (availablePartitions.size() == 1) {
	            newPart = availablePartitions.get(0).partition();
	        } else {//在可用分区列表中随机分布
	            while (newPart == null || newPart.equals(oldPart)) {
	                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
	                newPart = availablePartitions.get(random % availablePartitions.size()).partition();
	            }
	        }
	        // only change the sticky partition if it is null or prevPartition matches the current sticky partition.
	        if (oldPart == null) {
	            indexCache.putIfAbsent(topic, newPart);
	        } else {
	            indexCache.replace(topic, prevPartition, newPart);
	        }
	        return indexCache.get(topic);
	    }
	    return indexCache.get(topic);
	}

3个broker,6个分区,3个副本;下图可见共有6个对应的分区信息对象。

5.3. 记录收集器


消息经过分区器后缓存到记录收集器RecordAccumulator中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
作用:缓存消息,减少网络传输
相关参数 - buffer.memory:缓存大小,默认为32MB
每个分区都有一个Deque,producerBatch满了就会被发送线程发送到分区对应的节点;否则继续等待直到收集到足够的消息。
追加消息时,首先要获取分区所属队列Deque,若没有,则新建。
然后获取队列Deque中的最后一个批记录,若队列中还没有批记录或批记录已满,则新建并加入队列尾部。

5.3.1 源码

下面省略了其他代码,只保留核心代码部分:

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs) throws InterruptedException {
//....省略
        try {
            // check if we have an in-progress batch
            //获取分区的双端队列:寻找与消息分区所对应的双端队列(如果没有则新建)
            Deque dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            //没有可被填充的record batch, 创建一个新的record batch ,最后一个入参 abortForNewBatch为true,说明后续会将消息附加到这个新的record batch上
            if (abortOnNewBatch) {
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true);
            }
 //....省略
 }

private Deque getOrCreateDeque(TopicPartition tp) {
    Deque d = this.batches.get(tp);
    if (d != null)
        return d;
    d = new ArrayDeque<>();
    Deque previous = this.batches.putIfAbsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}


private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque deque, long nowMs) {
    ProducerBatch last = deque.peekLast();//获取分区的最后一个ProducerBatch
    if (last != null) {
        //将当前message尝试附加到获取到的最后一个ProducerBatch
        FutureRecordmetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
        if (future == null)
            //如果Deque已满
            //则需要释放资源,压缩buffers, 关闭producer batch
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
    }
    return null;
}
5.4. 客户端消息发送线程 5.4.1 从记录收集器获取数据

获取已准备好发送分区的节点列表;
若分区的leader未知,则更新元数据;
移除网络层还没有准备好的节点;
从记录收集器获取按照节点node将消息重新分组的Map>

5.4.2 创建生产者客户端请求

为每一个客户端节点node创建一个客户端请求
每个ProducerBatch都有唯一的 topicPartition;ProducerBatch的record是MemoryRecords;
将每个node中的所有的producerBatch按照TopicPartition放入Map 中;
最后封装为客户端请求ClientRequest,回调函数作为客户端请求的一个变量,客户端请求完成后,会调用回调函数;
将给定的请求放入inFlightRequests列表等待发送。 请求只能发送到就绪节点。 InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

把请求交给客户端网络对象NetworkClient执行真正的网络读写请求,将请求真正发送出去;


待续

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/704926.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号