栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka RoundRobinPartitioner发送消息分区不均

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka RoundRobinPartitioner发送消息分区不均

问题原因

是生产者“doSend”方法导致了这个问题。在“doSend”方法中,当累加器返回带有 abortForNewBatch 标志且值为真值的结果时,doSend 方法再次调用“partition”方法并且之前选择的分区保持未使用。如果主题只有两个分区,则此问题很危险,因为在这种情况下,将只使用一个分区。

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
    
                if (result.abortForNewBatch) {
                    int prevPartition = partition;
                    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                    partition = partition(record, serializedKey, serializedValue, cluster);
                    tp = new TopicPartition(record.topic(), partition);
                    if (log.isTraceEnabled()) {
                        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                    }
                    // producer callback will make sure to call both 'callback' and interceptor callback
                    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                    result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                }
...
解决方法

这个问题将通过使用像这样的自定义循环分区器来解决

package com.summer.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MyRoundRobin implements Partitioner {

    private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>();
    private final ConcurrentMap unusedPartition = new ConcurrentHashMap<>();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (unusedPartition.containsKey(topic))
            return unusedPartition.remove(topic).get();

        return nextPartition(topic, cluster);
    }

    public int nextPartition(String topic, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = counterNextValue(topic);
        List availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int counterNextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map configs) {
    }

    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        unusedPartition.put(topic, new AtomicInteger(prevPartition));
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439417.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号