栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink写入到kafka,只写入指定分区问题排查

flink写入到kafka,只写入指定分区问题排查

背景:

flink的datastream部署到线上时,发现数据只能写入到kafka的一些分区,其他分区没有数据写入。当把flink的并行度设置大于等于kafka的分区数时,kafka的分区都能写入数据。于是研究了一下源码。

FlinkFixedPartitioner源码:

package org.apache.flink.streaming.connectors.kafka.partitioner;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;


@PublicEvolvingpublic
class FlinkFixedPartitioner extends FlinkKafkaPartitioner {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;

    public FlinkFixedPartitioner() {
    }

    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(parallelInstanceId >= 0,
            "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(parallelInstances > 0,
            "Number of subtasks must be larger than 0.");
        this.parallelInstanceId = parallelInstanceId;
    }

    public int partition(T record, byte[] key, byte[] value,
        String targetTopic, int[] partitions) {
        Preconditions.checkArgument((partitions != null) &&
            (partitions.length > 0), "Partitions of the target topic is empty.");

        return partitions[this.parallelInstanceId % partitions.length];
    }

    public boolean equals(Object o) {
        return (this == o) || o instanceof FlinkFixedPartitioner;
    }

    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

关键代码:   partitions[this.parallelInstanceId % partitions.length]

根据源码可以看出:

flink是根据sink的subtask的id 和kafka的partition数量进行取余计算出相应的分区值的。

计算过程如下:

flink并行度为3(F0,F1,F2),partition数量为2(P0,P1),则F0->P0,F1->P1,F2->P0

flink并行度为2(F0,F1),partition数量为3(P0,P1,P2),则F0->P0,F1->P1

因此默认分区器会有以下问题:

1、当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据,从而导致数据倾斜。

2、如果sink的并行度总数不是topic的partition的倍数时,还是会存在数据倾斜问题。

3、当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition,否则新的分区也发现不了。

同时自己去查看flink的kafka connector,发现相关的connector有好几种,最新的flink1.12推荐使用新的jar包。

    
   org.apache.flink     
   flink-connector-kafka_2.12 
1.12.3
解决方法:

1、通过设置分区器值为null,走kafka的默认分区器来解决指定分区写不到数据的问题,

 resultDs.addSink(new FlinkKafkaProducer<>(productTopic,new KafkaKeyedSerializationSchema(),getPropertiesFromBrokerList(productBootstrapServers),null,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,5))

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758102.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号