- 前言
- 执行输出工具
- 打印执行计划
- 执行计划可视化工具
- 数据传输策略:ship_strategy
- 案例
Flink 的优化器会根据诸如数据量或集群机器数等不同的参数自动地为你的程序选择执行策略。但在大多数情况下,准确地了解 Flink 会如何执行你的程序是很有帮助的。
执行输出工具 打印执行计划- flink table 通过explainSql api输出
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tEnv = StreamTableEnvironment.create(env, blinkStreamSettings); ... String query = "insert into print SELECtn" + " user_id,n" + " product,n" + " amountn" + " FROM orders"; tEnv.explainSql(query);
使用EXPLAIN PLAN FOR 输出执行计划
... String query = "EXPLAIN PLAN FOR" + " insert into print SELECtn" + " user_id,n" + " product,n" + " amountn" + " FROM orders"; tEnv.executeSql(query).print();
输出:
... == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom File source Stage 2 : Operator content : CsvTableSource(read fields: user_id, product, amount) ship_strategy : REBALANCE Stage 3 : Operator content : SourceConversion(table=[default_catalog.default_database.orders, source: [CsvTableSource(read fields: user_id, product, amount)]], fields=[user_id, product, amount]) ship_strategy : FORWARD Stage 4 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.print], fields=[user_id, product, amount]) ship_strategy : FORWARD执行计划可视化工具
以下代码展示了如何在你的程序中打印 JSON 格式的执行计划:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ... System.out.println(env.getExecutionPlan());
可以通过如下步骤可视化执行计划:
- 使用你的浏览器打开可视化工具网站,
- 将 JSON 字符串拷贝并粘贴到文本框中,
- 点击 draw 按钮。
完成后,详细的执行计划图会在网页中呈现。
- Web 界面
Flink 提供了用于提交和执行任务的 Web 界面。该界面是 JobManager Web 界面的一部分,起到管理监控的作用,默认情况下运行在 8081 端口。
可视化工具可以在执行 Flink 作业之前展示执行计划图,你可以据此来指定程序的参数。
数据传输策略:ship_strategy分区策略统一实现StreamPartitioner抽象类,其接口为ChannelSelector。ChannelSelector来定义如何选择数据通道。
public interface ChannelSelector{ void setup(int numberOfChannels); int selectChannel(T record); boolean isBroadcast(); }
HASH[…]
BinaryRowData类型数据分析器
- 实现类:BinaryHashPartitioner
BROADCAST
广播所有输出通道的分区器,将消息发送到下游每个数据输出通道
- 实现类:BroadcastPartitioner
- 触发情况:数据流进行DataStream:broadcast对下游算子进行广播
CUSTOM
分区器,用于在键上使用用户定义的分区器函数选择数据输出通道
- 实现类:CustomPartitionerWrapper
- 触发情况:对数据流进行DataStream#partitionCustom自定义数据分区
FORWARD
仅将元素转发给本地运行的下游操作的分区器。
- 实现类:ForwardPartitioner
- 触发情况:
- 上下游并行度相同
- 算子:flatMap、filter、map,的one-to-one模式
GLOBAL
将所有元素发送到子任务ID为0的下游运算符的分区器。可能会在应用程序中造成严重的性能瓶颈
- 实现类:GlobalPartitioner
- 触发情况:对数据流进行 DataStream#global 操作
HASH
分区器根据key组索引选择目标通道。
- 实现类:KeyGroupStreamPartitioner
- 触发情况:对数据流进行keyBy/window、keyBy/reduce操作
REBALANCE
通过在输出通道中循环平均分配数据的分区器。
- 实现类:RebalancePartitioner
- 触发情况:上下游算子并行度不同,默认进行REBALANCE
RESCALE
在不同并行的情况下,通过在输出通道中循环平均分配数据的分区器。以循环方式将输出元素均匀地分布到下一个操作的实例子集。
- 实现类:RescalePartitioner
- 触发情况:对数据流进行 DataStream#rescale 操作
SHUFFLE
通过随机选择一个输出通道来平均分配数据的分区器。
- 实现类:ShufflePartitioner
- 触发情况:对数据流进行 DataStream#shuffle 操作
- 同步作业:
public class WordCountMain {
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//输入数据
DataStream dataStreamSource = env.fromElements("Hello my name is li", "Hello my name is li", "name is li");
dataStreamSource
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] splits = value.toLowerCase().split("\W+");
for (String split : splits) {
if (split.length() > 0) {
out.collect(new Tuple2<>(split, 1));
}
}
}
})
.setParallelism(2)
.filter((Objects::nonNull))
.setParallelism(2)
.partitionCustom((key, numPartitions) -> key == null ? 0 : Math.abs(key.hashCode()) % numPartitions,new KeySelector,String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
})
.map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws Exception {
return new Tuple2<>(value.f0.toLowerCase(), value.f1);
}
})
//根据key,将数据发送到不同subTasks
.keyBy((value -> value.f0))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
})
.setParallelism(2)
.shuffle()
.print()
.setParallelism(2);
System.out.println(env.getExecutionPlan());
}
}
- 执行计划



