原文链接
版本
flink: release-1.14 os: ubuntu 16.04 IDE: IDEAWordCount源码及执行流程概览
上文flink作业提交源码解析(1)中说道fink-client中的方法callMainMethod使用了反射机制,去运行用户代码的入口类。本文就进一步研究用户的代码执行的逻辑。
使用自带的WordCount.jar作为例子。运行命令如下:
bin/flink run -t remote -d ./examples/streaming/WordCount.jar
wordCount的代码如下:
public static void main(String[] args) throws Exception {
// Checking input parameters
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
DataStream> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}
上面代码中,当执行env.execute("Streaming WordCount"),会调用具体的ExecutionEnvironment去提交作业,这里是StreamExecutionEnvironment,如下面代码所示,逻辑便是生成streamGraph,然后执行。
//StreamExecutionEnvironment
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
}
获取和生成StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
获取一个执行环境ExecutionEnvironment
//StreamExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
通过一个StreamExecutionEnvironmentFactory去获取,这里的contextEnvironmentFactory是client在执行用户代码前初始化好的。
//StreamExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ps4y2G1D-1644398935142)(…/…/images/flink/stream-graph-jar/stream-1.png)]
当wordCount代码执行到env.execute("Streaming WordCount")时,可以看到text这个DataStream,包含了LegacySourceTransformation
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lC5yTNU3-1644398935148)(…/…/images/flink/stream-graph-jar/stream-2.png)]
counts这个则包含一个ReduceTransformation
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3lgTAnsv-1644398935153)(…/…/images/flink/stream-graph-jar/stream-3.png)]
获取StreamGraph的逻辑在final StreamGraph streamGraph = getStreamGraph();这行代码的getStreamGraph()方法中,一步步跟进去
//StreamExecutionEnvironment
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
//这里执行完生成StreamGraph之后,会清空StreamExecutionEnvironment的Transformations列表
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
private StreamGraphGenerator getStreamGraphGenerator(List> transformations) {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}
最终会使用StreamGraphGenerator这个生成器来生成StreamGraph, 打印需要转换的transformations,该作业包含了3个Transformation,
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sfMS19rq-1644398935155)(…/…/images/flink/stream-graph-jar/stream-4.png)]
StreamGraph生成流程总览首先,在env中生成一颗Transformations树,存储在List OneInputTransformation:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iAuefQxF-1644398935156)(…/…/images/flink/stream-graph-jar/stream-graph.png)] 最终的streamGraph为: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gc5KeW6F-1644398935158)(…/…/images/flink/stream-graph-jar/stream-graph1.png)] 主要包含几个步骤: 核心代码在transform(transformation)。 步骤如下: 调用具体的translator翻译的代码逻辑如下所示 下面我们针对具体的Transformation来进行理解。 在进入translate(translator, transform)时, 首先看看OneInputTransformation的属性。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W0jTMjSN-1644398935158)(…/…/images/flink/stream-graph-jar/stream-5.png)] 方法getParentInputIds(transform.getInputs())中,可以看到先对OneInputTransformation的输入进行转换,而它的输入为LegacySourceTransformation [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GLGf6TW0-1644398935160)(…/…/images/flink/stream-graph-jar/stream-6.png)] 同样的,先对其input进行转换,由于LegacySourceTransformation没有input,所以对自身进行translate [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9A3HhJco-1644398935161)(…/…/images/flink/stream-graph-jar/stream-7.png)] 具体根据StreamGraphGenerator中的shouldExecuteInBatchMode去决定转成流还是批模式的,这里是流模式 这里调用了flink-stream-java模块里的LegacySourceTransformationTranslator [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RPMTDYar-1644398935162)(…/…/images/flink/stream-graph-jar/stream-8.png)] 执行完addOperator()后: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F9k75Fvt-1644398935162)(…/…/images/flink/stream-graph-jar/stream-11.png)] 可以看到设置parallelism=1, maxParallelism=-1 回到SimpleTransformationTranslator.translateForStreaming(),下一步是configure(transformation, context); 至此完成LegacySourceTransformation的转换,回到OneInputTransformation的转换 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ctZDOmDg-1644398935163)(…/…/images/flink/stream-graph-jar/stream-12.png)] 和LegacySourceTransformation类似,经过 添加StreamNode设置该算子的状态key设置并行度设置uid、资源等
streamGraph中的streamNode为 以上在LegacySourceTransformation生成的streamNode和OneInputTransformation生成的streamNode中用streamEdge连起来。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3pjZn4jl-1644398935164)(…/…/images/flink/stream-graph-jar/stream-14.png)] 先处理input,input为PartitionTransformation 找到Input添加一个虚拟分区节点,不会生成 StreamNode,将PartitionTransformation的相关信息保存到Map [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9aOciRS3-1644398935164)(…/…/images/flink/stream-graph-jar/sream-15.png)] 完成了PartitionTransformation的转换后,进入ReduceTransformation 同样的,先生成StreamNode,然后添加streamEdge。生成StreamEdge和之前的略有不同,因为ReduceTransformation的上游是PartitionTransformation,而PartitionTransformation是虚拟分区节点。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fYOeOYLZ-1644398935165)(…/…/images/flink/stream-graph-jar/stream-16.png)] 过程是这样的: OneInputTransformation生成的StreamNodeId=2, PartitionTransformation生成的虚拟分区节点(VirtualPartitionNode)的virtualId=6,ReduceTransformation的StreamNodeId=4 进入ReduceTransformation转换的streamGraph.addEdgeInternal方法后,发现ReduceTransformation的上游的id在virtualPartitionNodes中存在,便从virtualPartitionNodes拿到虚拟分区节点的上游id、StreamPartitioner和StreamExchangeMode 递归调用streamGraph.addEdgeInternal,将outputPartitioner和exchangeMode等信息保存到streamEdge中,并添加到OneInputTransformation生成的StreamNode(Flat Map-2)的outEdges和ReduceTransformation生成的StreamNode(Keyed Aggregation-4)的inEdges中。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ilDKUsMi-1644398935166)(…/…/images/flink/stream-graph-jar/stream-15.png)] 至此完成了ReduceTransformation的转换,其streamNode如下图所示。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RxYOjz1w-1644398935167)(…/…/images/flink/stream-graph-jar/stream-17.png)] 流程和其他Transformation一样 具体逻辑和LegacySourceTransformation差不多。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jYGpMWwM-1644398935167)(…/…/images/flink/stream-graph-jar/stream-18.png)]获取到input为LegacySourceTransformation,生成了Source:Collection Source这个StreamNode处理OneInputTransformation,生成Flat Map这个StreamNode添加StreamEdge(Source: Collection Source-1_Flat Map-2_0_FORWARD_0)连接上游Source:Collection Source和Flat Map,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
RedudeTransformation:
获取到input为PartitionTransformation,该Transformation不会生成StreamNode,只是生成一个虚拟的分区节点,记录在StreamGraph对象的virtualPartitionNodes属性中处理RedudeTransformation,生成Keyed Aggregation这个StreamNode添加streamEdge连接上游和自身,发现上游是虚拟分区节点,从virtualPartitionNodes获取到上游的StreamNode即Flat Map,生成StreamEdge(Flat Map-2_Keyed Aggregation-4_0_HASH_0)连接Flat Map和Keyed Aggregation,这里指定了分区方式为HASH
LegacySinkTransformation:
获取到input为ReduceTransformation,该节点已生成处理LegacySinkTransformation,生成Sink: Print to Std. Out这个StreamNode添加StreamEdge(Keyed Aggregation-4_Sink: Print to Std. Out-5_0_FORWARD_0)连接上游Keyed Aggregation和Sink: Print to Std. Out,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
初始化并配置streamGraph的信息遍历所有的Transformation,并对transformation进行转换
//StreamGraphGenerator
public StreamGraph generate() {
//1. 初始化并配置streamGraph的信息
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
//用户保存已经转换的Transformation
alreadyTransformed = new HashMap<>();
//2. 对transformation进行转换
for (Transformation> transformation : transformations) {
transform(transformation);
}
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
//StreamGraphGenerator
private Collection
如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop设置并行度设置SlotSharingGroup调用判断是否有推断出outputType,有则抛出异常调用具体的translator,内置的translator都被保留在translatorMap中,具体如下所示
//StreamGraphGenerator
static {
@SuppressWarnings("rawtypes")
Map
将转换过的transformation添加到alreadyTransformed中
//StreamGraphGenerator
private Collection
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);
//SimpleTransformationTranslator
@Override
public final Collection
//LegacySourceTransformationTranslator
private Collection
添加source算子
//StreamGraph
public
private void configure(final T transformation, final Context context) {
final StreamGraph streamGraph = context.getStreamGraph();
final int transformationId = transformation.getId();
StreamGraphUtils.configureBufferTimeout(
streamGraph, transformationId, transformation, context.getDefaultBufferTimeout());
//设置算子uid
if (transformation.getUid() != null) {
streamGraph.setTransformationUID(transformationId, transformation.getUid());
}
if (transformation.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(
transformationId, transformation.getUserProvidedNodeHash());
}
StreamGraphUtils.validateTransformationUid(streamGraph, transformation);
//设置资源和验证
if (transformation.getMinResources() != null
&& transformation.getPreferredResources() != null) {
streamGraph.setResources(
transformationId,
transformation.getMinResources(),
transformation.getPreferredResources());
}
final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
if (streamNode != null) {
validateUseCaseWeightsNotConflict(
streamNode.getManagedMemoryOperatorScopeUseCaseWeights(),
transformation.getManagedMemoryOperatorScopeUseCaseWeights());
streamNode.setManagedMemoryUseCaseWeights(
transformation.getManagedMemoryOperatorScopeUseCaseWeights(),
transformation.getManagedMemorySlotScopeUseCases());
}
}
protected Collection
//StreamGraph
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList
private Collection
//ReduceTransformationTranslator
@Override
public Collection
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
exchangeMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
exchangeMode);
}
//LegacySinkTransformationTranslator
private Collection
//StreamGraph
public



