栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码解析系列--SubtaskStateMapper枚举类

Flink源码解析系列--SubtaskStateMapper枚举类

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

假如 Flink 任务某个算子的并行度为4,该算子的状态做了 checkpoint 后任务停止。该算子的并行度调整为2后重新启动任务,新 task 中的各个 subtask 需要从 checkpoint 中哪些旧的 subtask 中获取数据呢?

Flink 提供了 SubtaskStateMapper 枚举类来定义上述行为。

其核心方法为:

public abstract int[] getOldSubtasks(
		int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks);

其中,newSubtaskIndex 为新 subtask 的索引,oldNumberOfSubtasks 为旧 subtask 的数量,newNumberOfSubtasks 为新 subtask 的数量,返回值为旧 subtask 索引的数组。

SubtaskStateMapper 提供了 ARBITRARY、ROUND_ROBIN、FIRST、FULL、RANGE、UNSUPPORTED 等6类实现。

ARBITRARY
ARBITRARY {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		return ROUND_ROBIN.getOldSubtasks(
				newSubtaskIndex, oldNumberOfSubtasks, newNumberOfSubtasks);
	}
}

本质上调用的是 ROUND_ROBIN 的方法实现。

ROUND_ROBIN

旧 subtask 的数据会依次轮询发送到新 subtask 上。

ROUND_ROBIN {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		final IntArrayList subtasks =
				new IntArrayList(oldNumberOfSubtasks / newNumberOfSubtasks + 1);
		for (int subtask = newSubtaskIndex;
				subtask < oldNumberOfSubtasks;
				subtask += newNumberOfSubtasks) {
			subtasks.add(subtask);
		}
		return subtasks.toArray();
	}
}

缩容场景,假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。

扩容场景,设 oldNumberOfSubtasks 为2,newNumberOfSubtasks 为4。

FIRST

所有旧 subtask 的数据均发送到第1个新 subtask 上。

FIRST {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		// 如果新 subtask 的索引为0,则返回所有旧 subtask 的索引集合
		// 如果新 subtask 的索引不为0,则直接返回 EMPTY,即 int[0] 空数组
		return newSubtaskIndex == 0 ? IntStream.range(0, oldNumberOfSubtasks).toArray() : EMPTY;
	}
}

private static final int[] EMPTY = new int[0];

假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。

FULL

所有旧 subtask 的数据均发送到每1个新 subtask 上。

FULL {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		// 所有新 subtask 均对应所有旧 subtask 的索引集合
		return IntStream.range(0, oldNumberOfSubtasks).toArray();
	}

	@Override
	public boolean isAmbiguous() {
		return true;
	}
}

假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2。

RANGE

旧 subtask 的数据通过索引段的形式发送到新的 subtask 上。

RANGE {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		// 定义最大并行度 1<<15=32768
		int maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
		// 计算出新 subtask 对应的 KeyGroupRange
		final KeyGroupRange newRange =
				KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
						maxParallelism, newNumberOfSubtasks, newSubtaskIndex);
		// 计算出 newRange 对应的旧 subtask 索引集合的起点
		final int start =
				KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
						maxParallelism, oldNumberOfSubtasks, newRange.getStartKeyGroup());
		// 计算出 newRange 对应的旧 subtask 索引集合的终点
		final int end =
				KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
						maxParallelism, oldNumberOfSubtasks, newRange.getEndKeyGroup());
		return IntStream.range(start, end + 1).toArray();
	}

	@Override
	public boolean isAmbiguous() {
		return true;
	}
}

首先看一下 KeyGroupRange 是如何计算的:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
		int maxParallelism, int parallelism, int operatorIndex) {

	checkParallelismPreconditions(parallelism);
	checkParallelismPreconditions(maxParallelism);

	Preconditions.checkArgument(
			maxParallelism >= parallelism,
			"Maximum parallelism must not be smaller than parallelism.");

	int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
	int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
	return new KeyGroupRange(start, end);
}

本质上就是将 maxParallelism 份数据分成 parallelism 堆,返回每堆数据需要包含哪些份数据。

假设 parallelism 为2,最大并行度 1<<15=32768,则 operatorIndex 取值为0或1。

operatorIndex=0时:

start = ((0*32768+2-1) / 2)=0
end = ((0+1)*32768-1) / 2=16383

即KeyGroupRange(startKeyGroup=0, endKeyGroup=16383)

operatorIndex=1时:

start = ((1*32768+2-1) / 2)=16384
end = ((1+1)*32768-1) / 2=32767

即KeyGroupRange(startKeyGroup=16384, endKeyGroup=32767)

接着看一下如何通过 KeyGroupRange 计算出对应的 subtask 的索引集合,即 computeOperatorIndexForKeyGroup 方法:

public static int computeOperatorIndexForKeyGroup(
		int maxParallelism, int parallelism, int keyGroupId) {
	return keyGroupId * parallelism / maxParallelism;
}

假设 oldNumberOfSubtasks 为5,newNumberOfSubtasks 为2,当 new subtask 的索引为0时:

KeyGroupRange(startKeyGroup=0, endKeyGroup=16383)

根据上述代码计算该 KeyGroupRange 对应的 old subtask 的索引集合:

start = 0*5/32768=0
end = 16383*5/32768=2

即对应的 subtask 集合为 {0, 1, 2}

同样的, 当new subtask 的索引为1时:

KeyGroupRange(startKeyGroup=16384, endKeyGroup=32767)

start = 16384*5/32768=2
end = 32767*5/32768=4

即对应的 subtask 集合为 {2, 3, 4}

UNSUPPORTED
UNSUPPORTED {
	@Override
	public int[] getOldSubtasks(
			int newSubtaskIndex, int oldNumberOfSubtasks, int newNumberOfSubtasks) {
		// 直接抛出异常
		throw new UnsupportedOperationException(
				"Cannot rescale the given pointwise partitioner.n"
						+ "Did you change the partitioner to forward or rescale?n"
						+ "It may also help to add an explicit shuffle().");
	}
}

本文到此结束,感谢阅读!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761834.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号