flink的datastream部署到线上时,发现数据只能写入到kafka的一些分区,其他分区没有数据写入。当把flink的并行度设置大于等于kafka的分区数时,kafka的分区都能写入数据。于是研究了一下源码。
FlinkFixedPartitioner源码:
package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.Preconditions; @PublicEvolvingpublic class FlinkFixedPartitionerextends FlinkKafkaPartitioner { private static final long serialVersionUID = -3785320239953858777L; private int parallelInstanceId; public FlinkFixedPartitioner() { } public void open(int parallelInstanceId, int parallelInstances) { Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); this.parallelInstanceId = parallelInstanceId; } public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { Preconditions.checkArgument((partitions != null) && (partitions.length > 0), "Partitions of the target topic is empty."); return partitions[this.parallelInstanceId % partitions.length]; } public boolean equals(Object o) { return (this == o) || o instanceof FlinkFixedPartitioner; } public int hashCode() { return FlinkFixedPartitioner.class.hashCode(); } }
关键代码: partitions[this.parallelInstanceId % partitions.length]
根据源码可以看出:
flink是根据sink的subtask的id 和kafka的partition数量进行取余计算出相应的分区值的。
计算过程如下:
flink并行度为3(F0,F1,F2),partition数量为2(P0,P1),则F0->P0,F1->P1,F2->P0
flink并行度为2(F0,F1),partition数量为3(P0,P1,P2),则F0->P0,F1->P1
因此默认分区器会有以下问题:
1、当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据,从而导致数据倾斜。
2、如果sink的并行度总数不是topic的partition的倍数时,还是会存在数据倾斜问题。
3、当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition,否则新的分区也发现不了。
同时自己去查看flink的kafka connector,发现相关的connector有好几种,最新的flink1.12推荐使用新的jar包。
解决方法:org.apache.flink flink-connector-kafka_2.121.12.3
1、通过设置分区器值为null,走kafka的默认分区器来解决指定分区写不到数据的问题,
resultDs.addSink(new FlinkKafkaProducer<>(productTopic,new KafkaKeyedSerializationSchema(),getPropertiesFromBrokerList(productBootstrapServers),null,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,5))



