栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

六、深入分析Fetch请求协议

六、深入分析Fetch请求协议

  • 此文章基于kafka2.5,协议版本为11
    为了能更加了解上一篇中的18b流量怎么来的,这里我们来深入分析一下
    首先来看FetchRequest的结构

FetchResponse的结构

18b流量的来源

想要了解18b流量就需要深入了解kafka.server.KafkaApis#sizeOfThrottledPartitions方法,代码如下

    private def sizeOfThrottledPartitions(versionId: Short,
                                        unconvertedResponse: FetchResponse[Records],
                                        quota: ReplicationQuotaManager): Int = {
    //1.1 构建SelectingIterator对象
    val iter = new SelectingIterator(unconvertedResponse.responseData, quota)
    //1.2 调用FetchResponse的sizeOf方法计算流量值
    FetchResponse.sizeOf(versionId, iter)
    }
构建SelectingIterator对象代码分析

这里我们着重展示一下hasNext方法,可以看到这是重写了hasNext方法,会根据quota.isThrottled(是否在限速副本中)来过滤数据

    override def hasNext: Boolean = {
      while ((nextElement == null) && iter.hasNext()) {
        val element = iter.next()
        if (quota.isThrottled(element.getKey)) {
          nextElement = element
        }
      }
      nextElement != null
    }
调用FetchResponse的sizeOf代码分析

想要搞清楚sizeOf的计算方式,可以先回顾一下前面 FetchResponse的结构。在sizeOf中可以看到是调用了toStruct的sizeOf方法,然后在这个基础上+4。

    public static  int sizeOf(short version,
                                                     Iterator>> partIterator) {
        // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
        // use arbitrary values here without affecting the result.
        return 4 + toStruct(version, 0, Errors.NONE, partIterator, INVALID_SESSION_ID).sizeOf();
    }
我们继续追踪sizeOf方法,可以看到就是调用这个Field的sizeOf方法来计算size,如果FetchResponse为空的话最后是这样的:{throttle_time_ms=0,error_code=0,session_id=0,responses=[]}

前面我们知道throttle_time_ms是int32,error_code是int16,session_id是int32,所以目前的size=4b+2b+4b=10b,再加上前面默认的4b,现在应有14b,还少了4b,这4b
就是在responses中计算来的,大家可以见第二段ArrayOf#sizeOf的计算方式,如果Object为空,也会默认给4b,所以在FetchResponse为空的情况下会有18b的流量值。

    public int sizeOf() {
    return this.schema.sizeOf(this);
    }
    //schema.sizeOf
    public int sizeOf(Object o) {
        int size = 0;
        Struct r = (Struct) o;
        for (BoundField field : fields) {
            try {
                size += field.def.type.sizeOf(r.get(field));
            } catch (Exception e) {
                throw new SchemaException("Error computing size for field '" + field.def.name + "': " +
                        (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        return size;
    }
    //org.apache.kafka.common.protocol.types.ArrayOf#sizeOf
    public int sizeOf(Object o) {
        int size = 4;
        if (o == null)
            return size;
    
        Object[] objs = (Object[]) o;
        for (Object obj : objs)
            size += type.sizeOf(obj);
        return size;
    }
broker与限速相关的配置
名称描述类型默认值有效值重要性
metric.reporters度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。list“”
metrics.num.samples维持计算度量的样本数int2[1,…]
metrics.recording.level指标的最高记录级别stringINFO
metrics.sample.window.ms计算度量样本的时间窗口long30000[1,…]
quota.window.num在内存中保留客户端限额的样本数int 11[1,…]
quota.window.size.seconds每个客户端限额的样本时间跨度int 1[1,…]
replication.quota.window.num在内存中保留副本限额的样本数int 11[1,…]
replication.quota.window.size.seconds每个副本限额样本数的时间跨度int 1[1,…]
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582427.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号