栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

(5.2)kafka消费者源码——Fetcher类

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(5.2)kafka消费者源码——Fetcher类

1:Fetcher类

此类管理broker上的的提取过程。
Fetcher类的主要功能是发送Fetcher请求,获取指定消息集合,处理FetchResponse,更新消息位置。

1.1:属性
 private final ConsumerNetworkClient client;    //client负责网络通信

    private final Time time;
    private final int minBytes;    //在服务端收到FetchRequest之后并不是立即响应,而是当可返回的消息数据累计到至少minBytes个字节时才响应。这样每个FetchResponse中就包含了多个消息,提供效率。

    private final int maxBytes;
    private final int maxWaitMs;    //等待FetchResponse的最长时间

    private final int fetchSize;    //每次fetch操作的最大字节数

    private final long retryBackoffMs;    //重试等待时间

    private final long requestTimeoutMs;    //fetch请求超时时间

    private final int maxPollRecords;    //每次获取record的最大数量

    private final boolean checkCrcs;
    private final Metadata metadata;    //记录kakfa集群的元数据

    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;    //记录topic的订阅情况,每个TopicPartition的消费情况,偏移量记录等

    //已经完成获取请求的线程安全的队列
    private final ConcurrentLinkedQueue completedFetches;
    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
    //数据key的反序列化
    private final ExtendedDeserializer keyDeserializer;
    private final ExtendedDeserializer valueDeserializer;
    private final IsolationLevel isolationLevel;
    private final Map sessionHandlers;
    private final AtomicReference cachedListOffsetsException = new AtomicReference<>();

    //PartitionRecrods类型,保存了CompleteFetch解析后的结果集合,它有三个主要字段
    
    //记录那个分区,消费的偏移量,数据条数等元数据信息
    private PartitionRecords nextInLineRecords = null;
1.2:方法
  • OffsetData :获取偏移量
  • sendFetches:向分配分区的所在节点发送fetch请求,拉取数据
  • fetchedRecords: 处理已经获取的每个分区的记录,清空记录缓冲区并更新消耗位置。
  
    private static class OffsetData {
        final long offset;
        final Long timestamp; //  null if the broker does not support returning timestamps

        OffsetData(long offset, Long timestamp) {
            this.offset = offset;
            this.timestamp = timestamp;
        }
    }

向分配分区的所在节点发送fetch请求,拉取数据:sendFetches

  public int sendFetches() {
        // 定义一个节点和请求数据的map集合,存储节点和节点的连接会话
        // 所有节点创建获取请求,我们为这些节点分配了分区,这些分区没有正在运行的现有请求。
        Map fetchRequestMap = prepareFetchRequests();
        for (Map.Entry entry : fetchRequestMap.entrySet()) {
            //获取节点
            final Node fetchTarget = entry.getKey();
            // 获取和节点的连接会话
            final FetchSessionHandler.FetchRequestData data = entry.getValue();

            // 根据连接会话创建一个获取数据的请求
            final FetchRequest.Builder request = FetchRequest.Builder
                    .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                    .isolationLevel(isolationLevel)
                    .setMaxBytes(this.maxBytes) //一次拉取的最大字节数50M
                    .metadata(data.metadata())
                    .toForget(data.toForget());
            if (log.isDebugEnabled()) {
                log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
            }
            //把发往每个Node的FetchRequest都缓存到unsent队列上
            client.send(fetchTarget, request) //添加Listener监听,这也是处理FetchResponse的入口
                    .addListener(new RequestFutureListener() {
                        @Override
                        public void onSuccess(ClientResponse resp) { //请求成功了,处理返回的响应
                            FetchResponse response = (FetchResponse) resp.responseBody();
                            FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
                            if (handler == null) {
                                log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                    fetchTarget.id());
                                return;
                            }
                            if (!handler.handleResponse(response)) {
                                return;
                            }
                            // 获取响应数据的主题分区对象,放入set集合中
                            Set partitions = new HashSet<>(response.responseData().keySet());
                            // 这是静态内部类
                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                            //遍历响应中的数据
                            for (Map.Entry entry : response.responseData().entrySet()) {
                                //获取分区
                                TopicPartition partition = entry.getKey();
                                // 获取偏移量
                                long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                                // 获取分区数据,里面包括消费的数据
                                FetchResponse.PartitionData fetchData = entry.getValue();

                                log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                        isolationLevel, fetchOffset, partition, fetchData);
                                //创建completedFetch,把拉取完成数据缓存到completedFetch队列中
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                        resp.requestHeader().apiVersion()));
                            }

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
                            if (handler != null) {
                                handler.handleError(e);    
                            }
                        }
                    });
        }
        return fetchRequestMap.size();
    }

fetchedRecords: 处理已经获取的每个分区的记录,清空记录缓冲区并更新消耗位置。

    public Map>> fetchedRecords() {
    // 定义一个消费的主题分区和数据map集合
        Map>> fetched = new HashMap<>();
        int recordsRemaining = maxPollRecords;     //记录剩余可拉取的记录条数,每拉取一次被更新一次

        try {
            while (recordsRemaining > 0) {// 只要需要拉取记录数大于0,就继续拉取
                // 1:判断分区记录是否为空或者该分区是否被获取过
                if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                    // completedFetches存储已经拉取的数据。获取但不删除队列的头元素,如果不为空则是已经拉取的数据
                    CompletedFetch completedFetch = completedFetches.peek();

                    //为空说明没有被拉取过的数据,break
                    if (completedFetch == null) break;

                    //如果不是空解析一个completedFetches得到一个PartitionRecords对象。
                    // 记录那个分区,获取了多少记录,偏移量是多少
                    nextInLineRecords = parseCompletedFetch(completedFetch);
                    //poll是从队列取出拉取的批数据并且删除
                    completedFetches.poll();
                } else {//:2获取消费数据集
                    List> records = fetchRecords(nextInLineRecords, recordsRemaining);

                    // 获取主题分区
                    TopicPartition partition = nextInLineRecords.partition;if (!records.isEmpty()) {
                        //该分区的数据
                        List> currentRecords = fetched.get(partition);
                        if (currentRecords == null) {
                            fetched.put(partition, records);
                        } else {
                            // this case shouldn't usually happen because we only send one fetch at a time per partition,
                            // but it might conceivably happen in some rare cases (such as partition leader changes).
                            // we have to copy to a new list because the old one may be immutable
                            List> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                            newRecords.addAll(currentRecords);
                            newRecords.addAll(records);
                            fetched.put(partition, newRecords);
                        }

                        // 可消费的剩下数据条数减去刚才消费到的数据集的大小
                        recordsRemaining -= records.size();
                    }
                }
            }
        } catch (KafkaException e) {
            if (fetched.isEmpty())
                throw e;
        }
        return fetched;
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1032964.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号