栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka消费者分区分配策略

kafka消费者分区分配策略

kafka消费者分区分配策略
  • 分区分配策略
    • RangeAssigor分配策略
    • RoundRobunAssignor分配策略
    • StickyAssignor分配策略
    • 自定义分配分区

分区分配策略

kafka消费者客户端提供了partition.assign-ment.strategy来设置消费者与订阅主题之间的分区分配策略。一共有三种分配策略RangeAssignor(默认策略)、RoundRobunAssignor、StickyAssignor
消费者客户端可以配置多个分配策略,彼此以逗号分割

RangeAssigor分配策略

按照消费者数量和分区数进行整除运算来获得一个跨度,按照跨度进行分配,保证分区均匀分配给消费者。n=分区数/消费者数量,m=分区数据%消费者数量,m个多余分区分配给前m个消费者,前m个消费者分配n+1个分区,后面的消费者分配n个分区
例:一共有两个消费者c0、c1和两个topict0、t1,t0和t1分别有三个分区,3/2 =1;3%2=1,所以c0承担t0里两个分区的消费和t1里两个分区的消费,最终消费结果为c0消费4个分区,c1消费两个分区,导致消费者分配分区不均匀的情况

RoundRobunAssignor分配策略

RoundRobunAssignor将消费者和消费者订阅的所有主题的分区按照字典排序,然后通过轮询依次将分区分配给消费者
例:一共有两个消费者c0、c1和两个topict0、t1,t0和t1分别有三个分区,则分区结果为c0:t0p0、t0p2、t1p1,c1:t0p1、t1p0、t1p2,相比RangeAssigor而言消费者分配到的分区更加均匀。
三个消费者c0、c1、c2和三个topict0、t1、t2分别有1、2、3个分区,c0订阅了t0、t1;c1订阅t0、t1;c3订阅了t0、t1、t2,分区分配结果如下:
c0:t0p0
c1:t1p0
c2: t1p1、t2p0、t2p1、t2p2

StickyAssignor分配策略

目的:1.分区尽可能均匀 2.再均衡时分配结果尽可能和上次相似。优点:重分配时可以减少不必要的分区移动。
例:三个消费者c0、c1、c2和三个topict0、t1、t2分别有1、2、3个分区,c0订阅了t0、t1;c1订阅t0、t1;c3订阅了t0、t1、t2,分区分配结果如下:
c0:t0p0
c1:t1p0、t1p1
c2: t2p0、t2p1、t2p2
假设c0被移除消费组,再均衡后的分区分配结果为
c1:t1p0、t1p1、t1p1
c2: t2p0、t2p1、t2p2

自定义分配分区

实现PartitionAssignor,subscription()、assign()、name()、onAssignment()
subscription为订阅的消费者
assign()为为消费者分配核心,放会List<消费者, List<主题分区>>
name 为自定义分配分区名称
消费者订阅每一个分区实现广播功能

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class BroadcastAssignor extends AbstractPartitionAssignor {

    private Map> consumersPerTopic(Map consumermetadata) {
        Map> res = new HashMap();
        Iterator var3 = consumermetadata.entrySet().iterator();

        while(var3.hasNext()) {
            Map.Entry subscriptionEntry = (Map.Entry)var3.next();
            String consumerId = subscriptionEntry.getKey();
            Iterator var6 = subscriptionEntry.getValue().topics().iterator();

            while(var6.hasNext()) {
                String topic = (String)var6.next();
                put(res, topic, consumerId);
            }
        }

        return res;
    }
    @Override
    public Map> assign(Map partitionsPerTopic, Map subscriptions) {
        // 获取每个主题有多少消费者订阅,key为主题
        Map> consumersPerTopic = this.consumersPerTopic(subscriptions);
        Map> resultAssign = new HashMap>();
        // 获取消费者
        Iterator var5 = subscriptions.keySet().iterator();

        while(var5.hasNext()) {
            String memberId = (String)var5.next();
            resultAssign.put(memberId, new ArrayList());
        }

        var5 = consumersPerTopic.entrySet().iterator();

        while(true) {
            String topic;
            List consumersForTopic;
            Integer numPartitionsForTopic;
            do {
                if (!var5.hasNext()) {
                    return resultAssign;
                }

                Map.Entry> topicEntry = (Map.Entry)var5.next();
                topic = topicEntry.getKey();
                consumersForTopic =  topicEntry.getValue();
                numPartitionsForTopic = partitionsPerTopic.get(topic);
            } while(numPartitionsForTopic == null);
            List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);

            for(int i =0; i < consumersForTopic.size(); i++) {
                ((List)resultAssign.get(consumersForTopic.get(i))).addAll(partitions);
            }
        }
    }

    @Override
    public String name() {
        return "broadcast";
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/670933.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号