本文根据flink官方提供的测试方法进行编写,重点在于展示如何实现Flink任务的单元测试。
测试是每个软件开发过程中不可或缺的一部分,单元测试的好处有很多,比如:
- 确保单个方法正常运行;
- 如果修改了方法代码,只需确保其对应的单元测试通过;
- 测试代码本身就可以作为示例代码;
- 可以自动化运行所有的测试并获得分析报告。
除上述好处之外,其本身也是规范化流程的一部分。那咱废话不多说,进入正题
测试用户自定义函数 对无状态、无时间限制的UDF进行单元测试我们以两个无状态的MapFunction和FlatMapFunction为例,实现相同的累加功能,并对该功能进行测试,下面分别是两个Function的具体实现
public class IncrementMapFunction implements MapFunction{ @Override public Integer map(Integer value) throws Exception { return value + 1; } }
public class IncrementFlatMapFunction implements FlatMapFunction{ @Override public void flatMap(Integer value, Collector out) throws Exception { out.collect(value + 1); } }
MapFunction的单元测试比较简单,直接使用测试框架就可以进行测试,测试代码如下
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
Assert.assertEquals(3, incrementer.map(2).intValue());
}
FlatMapFunction由于使用org.apache.flink.util.Collector收集结果,因此在进行测试时需要提供Collector的模拟对象,有两种方法可以提供模拟对象,一种是通过Mock,另一种是通过ListCollector。在使用Mockito时,非SpringBoot项目需要手动引入依赖
org.mockito mockito-core ${mockito.version} test
// use Mock to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
Collector collector = mock(Collector.class);
// call the methods that you have implemented
incrementer.flatMap(2, collector);
//verify collector was called with the right output
Mockito.verify(collector, times(1)).collect(3);
}
// use ListCollector to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
List list = new ArrayList<>();
ListCollector collector = new ListCollector<>(list);
// call the methods that you have implemented
incrementer.flatMap(2, collector);
//verify collector was called with the right output
Assert.assertEquals(Collections.singletonList(3), list);
}
对有状态或及时UDF和用户自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
- OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
- KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
- TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
- KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
要使用测试工具,还需要一组其他的依赖项,如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
org.apache.flink flink-test-utils 1.15.0 test
需要注意的是1.15以前版本依赖不同于上述依赖。早期版本添加以下依赖项
org.apache.flink flink-test-utils_${scala.version} ${flink.version} test
该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。如果想本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项(该依赖是1.15.0版本中添加的,早期版本中没有该依赖):
测试Flink作业org.apache.flink flink-table-test-utils 1.15.0 test



