目录
StreamGraph源码
上传jar包
生成StreamGraph
生成StreamNode
生成Edge
核心方法
参考
StreamGraph源码
StreamGraph作为Flink最上层的逻辑封装可以理解为用户API的转化的逻辑层,主要是把用户编写的Transformation转换成StreamNode并生成指向上下游的StreamEdge并装载进StreamGraph。接下来主要以Yarn模式为例子。
上传jar包
当客户端submit脚本上传jar包之后,由Flink获取该jar包,并且通过反射调用用户的main函数。
//过程比较多,我尽量写的详细点……
//主要是提交函数,CliFrontend是程序的提交的入口,重点方法是cli.parseAndRun(args)
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// 2. 加载全局conf配置:
// "taskmanager.memory.process.size" -> "1728m"
// "parallelism.default" -> "1"
// "jobmanager.execution.failover-strategy" -> "region"
// "jobmanager.rpc.address" -> "localhost"
// "taskmanager.numberOfTaskSlots" -> "1"
// "jobmanager.memory.process.size" -> "1600m"
// "jobmanager.rpc.port" -> "6123"
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 3. 加载自定义参数
final List customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
try {
// 构建CliFrontend : GenericCLI > flinkYarnSessionCLI > DefaultCLI
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 使用parseAndRun 提交指令
int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
System.exit(retCode);
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
//之后在parseAndRun(args)这个函数,会根据请求的命令的不同调用不同的方法,例如run,stop等等
//支持的命令
//因为我们是提交job,所以调用的是CliFrontend.run函数
//在这个函数里面主要是确定执行Flink的方法/环境/程序等等信息,
//通过CliFrontend.executeProgram(effectiveConfiguration, program)
//然后交由ClientUtils工具类提交任务
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
//由ClientUtils.executeProgram(跟上一个方法名是一样的,只不过是类不同),构建程序的执行环境/类加载器,开始准备执行...
// 执行程序代码
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// 获取用户了加载器. : FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@3439
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
// 缓存当前类加载器...
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// 设置类加载器为用户指定的类加载器..
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
//log info : Starting program (detached: false)
LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
// 获取用户代码中的环境....
// getExecutionEnvironment
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// 通过反射的方式, 调用用户程序的mian方法...
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
//最后的最后,通过PackagedProgram.invokeInteractiveModeForExecution
//这里是通过调用底层的callMainMethod方法,通过反射的方式去调用main方法。
//mainMethod.invoke(null, (Object) args)到这里才是最终开始执行。
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
// mainClass: class org.apache.flink.streaming.examples.socket.SocketWindowWordCount
// args
// 0 = "--port"
// 1 = "9999"
callMainMethod(mainClass, args);
}
// class org.apache.flink.streaming.examples.socket.SocketWindowWordCount args : --port 9999
private static void callMainMethod(Class> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
// public static void org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(java.lang.String[]) throws java.lang.Exception
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException(
"Could not look up the main(String[]) method from the class "
+ entryClass.getName()
+ ": "
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
// 开始执行 !!!!!!!!!
try {
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException(
"Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException(
"Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException(
"The main method caused an error: " + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException(
"An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
}
当mainMethod.invoke开始执行的时候,各个operator会生成对应的Transformation等封装的逻辑实例,直到运行到StreamExecutionEnvironment.execute()后,才开始懒执行。类似于Spark中的action算子,才开始真正的执行代码。
生成StreamGraph
//调用getStreamGraph函数
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
//生成StreamGraph
return execute(getStreamGraph(jobName));
}
// 这里主要是生成StreamGraph,其中使用StreamGraphGenerator.generate函数
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
public StreamGraph generate() {
//生成StreamGraph实例
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
//判断执行模式
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
for (Transformation> transformation: transformations) {
//生成streamNode 和 streamEdge
transform(transformation);
}
.........
}
//最终根据transform(transformtaion),生成StreamGraph
//其中transform函数将会调用translateInternal进行生成实例。
private Collection translateInternal(
final OneInputTransformation transformation,
final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
//生成StreamNode,并添加到StreamGraph的streamNodesMap中
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
transformation.getOperatorFactory(),
transformation.getInputType(),
transformation.getOutputType(),
transformation.getName());
.......
for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
//生成Edge并把该edge添加到自己的上下游streamNode中
streamGraph.addEdge(inputId, transformationId, 0);
}
}
这里说明一下,在生成StreamGraph的时候,其中有transformation参数,这个参数主要是在StreamGraphGenerator.generate(this, transformations)的时候进行传递的。是通过protected final List
publicSingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator) { OneInputTransformation resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); ... getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
生成StreamNode
public void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory operatorFactory,
TypeInformation inTypeInfo,
TypeInformation outTypeInfo,
String operatorName) {
//后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
//比如Map函数对应的OneInputStreamTask.class
Class extends AbstractInvokable> invokableClass =
operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
outTypeInfo, operatorName, invokableClass);
}
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class extends AbstractInvokable> vertexClass,
StreamOperatorFactory> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
//生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
// 里面封装了用户的userFunction)
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);
.....
}
生成Edge
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner> partitioner,
List outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
//如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
//outputTag标识一个sideout流
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
//partition类型的transformation同上
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 分区器由上下游的并行度是否一致决定
// 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
// 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner
核心方法
private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner> partitioner, ListoutputNames, OutputTag outputTag, ShuffleMode shuffleMode) { //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; //outputTag标识一个sideout流 if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); //partition类型的transformation同上 } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } shuffleMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. // 分区器由上下游的并行度是否一致决定 // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel, // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel) if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner
核心方法
addOperator:构建streamNodes集合
addEdge:构建边
addEdgeInternal:构建边,在该方法中,决定分区的策略,如果没有指定分区则按照上游和下游算子的并行度是否相同决定是本地分发,还是均匀分发
getJobGraph:生成JobGraph
getStreamingPlanAsJSON:StreamGraph字符串表示形式
参考
Flink1.12源码解读——StreamGraph执行图构建过程_ws0owws0ow的博客-CSDN博客
Flink运行架构详细讲解 - 程序员大本营
Flink之StreamGraph生成源码分析_陪你一起捡蛋壳的博客-CSDN博客_flink streamgraph



