本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
假如 Flink 任务某个算子的并行度为4,该算子的状态做了 checkpoint 后任务停止。该算子的并行度调整为2后重新启动任务,新 task 中的各个 subtask 需要从 checkpoint 中哪些旧的 subtask 中获取数据呢?
Flink 提供了 SubtaskStateMapper 枚举类来定义上述行为。
其核心方法为:
public abstract int[] getOldSubtasks( int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks);
其中,newSubtaskIndex 为新 subtask 的索引,oldNumberOfSubtasks 为旧 subtask 的数量,newNumberOfSubtasks 为新 subtask 的数量,返回值为旧 subtask 索引的数组。
SubtaskStateMapper 提供了 ARBITRARY、ROUND_ROBIN、FIRST、FULL、RANGE、UNSUPPORTED 等6类实现。
ARBITRARYARBITRARY {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
return ROUND_ROBIN.getOldSubtasks(
newSubtaskIndex, oldNumberOfSubtasks, newNumberOfSubtasks);
}
}
本质上调用的是 ROUND_ROBIN 的方法实现。
ROUND_ROBIN旧 subtask 的数据会依次轮询发送到新 subtask 上。
ROUND_ROBIN {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
final IntArrayList subtasks =
new IntArrayList(oldNumberOfSubtasks / newNumberOfSubtasks + 1);
for (int subtask = newSubtaskIndex;
subtask < oldNumberOfSubtasks;
subtask += newNumberOfSubtasks) {
subtasks.add(subtask);
}
return subtasks.toArray();
}
}
缩容场景,假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。
扩容场景,设 oldNumberOfSubtasks 为2,newNumberOfSubtasks 为4。
所有旧 subtask 的数据均发送到第1个新 subtask 上。
FIRST {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
// 如果新 subtask 的索引为0,则返回所有旧 subtask 的索引集合
// 如果新 subtask 的索引不为0,则直接返回 EMPTY,即 int[0] 空数组
return newSubtaskIndex == 0 ? IntStream.range(0, oldNumberOfSubtasks).toArray() : EMPTY;
}
}
private static final int[] EMPTY = new int[0];
假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。
所有旧 subtask 的数据均发送到每1个新 subtask 上。
FULL {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
// 所有新 subtask 均对应所有旧 subtask 的索引集合
return IntStream.range(0, oldNumberOfSubtasks).toArray();
}
@Override
public boolean isAmbiguous() {
return true;
}
}
假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。
旧 subtask 的数据通过索引段的形式发送到新的 subtask 上。
RANGE {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
// 定义最大并行度 1<<15=32768
int maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
// 计算出新 subtask 对应的 KeyGroupRange
final KeyGroupRange newRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
maxParallelism, newNumberOfSubtasks, newSubtaskIndex);
// 计算出 newRange 对应的旧 subtask 索引集合的起点
final int start =
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, oldNumberOfSubtasks, newRange.getStartKeyGroup());
// 计算出 newRange 对应的旧 subtask 索引集合的终点
final int end =
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, oldNumberOfSubtasks, newRange.getEndKeyGroup());
return IntStream.range(start, end + 1).toArray();
}
@Override
public boolean isAmbiguous() {
return true;
}
}
首先看一下 KeyGroupRange 是如何计算的:
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism, int parallelism, int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(
maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
本质上就是将 maxParallelism 份数据分成 parallelism 堆,返回每堆数据需要包含哪些份数据。
假设 parallelism 为2,最大并行度 1<<15=32768,则 operatorIndex 取值为0或1。
operatorIndex=0时:
start = ((0*32768+2-1) / 2)=0 end = ((0+1)*32768-1) / 2=16383 即KeyGroupRange(startKeyGroup=0, endKeyGroup=16383)
operatorIndex=1时:
start = ((1*32768+2-1) / 2)=16384 end = ((1+1)*32768-1) / 2=32767 即KeyGroupRange(startKeyGroup=16384, endKeyGroup=32767)
接着看一下如何通过 KeyGroupRange 计算出对应的 subtask 的索引集合,即 computeOperatorIndexForKeyGroup 方法:
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2,当 new subtask 的索引为0时:
KeyGroupRange(startKeyGroup=0, endKeyGroup=16383)
根据上述代码计算该 KeyGroupRange 对应的 old subtask 的索引集合:
start = 0*5/32768=0
end = 16383*5/32768=2
即对应的 subtask 集合为 {0, 1, 2}
同样的, 当new subtask 的索引为1时:
KeyGroupRange(startKeyGroup=16384, endKeyGroup=32767)
start = 16384*5/32768=2
end = 32767*5/32768=4
即对应的 subtask 集合为 {2, 3, 4}
UNSUPPORTED
UNSUPPORTED {
@Override
public int[] getOldSubtasks(
int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
// 直接抛出异常
throw new UnsupportedOperationException(
"Cannot rescale the given pointwise partitioner.n"
+ "Did you change the partitioner to forward or rescale?n"
+ "It may also help to add an explicit shuffle().");
}
}
本文到此结束,感谢阅读!



