栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink单元测试用法讲解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink单元测试用法讲解

本文根据flink官方提供的测试方法进行编写,重点在于展示如何实现Flink任务的单元测试。

测试是每个软件开发过程中不可或缺的一部分,单元测试的好处有很多,比如:

  • 确保单个方法正常运行;
  • 如果修改了方法代码,只需确保其对应的单元测试通过;
  • 测试代码本身就可以作为示例代码;
  • 可以自动化运行所有的测试并获得分析报告。

除上述好处之外,其本身也是规范化流程的一部分。那咱废话不多说,进入正题

测试用户自定义函数 对无状态、无时间限制的UDF进行单元测试

我们以两个无状态的MapFunction和FlatMapFunction为例,实现相同的累加功能,并对该功能进行测试,下面分别是两个Function的具体实现

public class IncrementMapFunction implements MapFunction {
    @Override
    public Integer map(Integer value) throws Exception {
        return value + 1;
    }
}
public class IncrementFlatMapFunction implements FlatMapFunction {
    @Override
    public void flatMap(Integer value, Collector out) throws Exception {
        out.collect(value + 1);
    }
}

MapFunction的单元测试比较简单,直接使用测试框架就可以进行测试,测试代码如下

@Test
public void testIncrement() throws Exception {
	// instantiate your function
	IncrementMapFunction incrementer = new IncrementMapFunction();
	// call the methods that you have implemented
	Assert.assertEquals(3, incrementer.map(2).intValue());
}

FlatMapFunction由于使用org.apache.flink.util.Collector收集结果,因此在进行测试时需要提供Collector的模拟对象,有两种方法可以提供模拟对象,一种是通过Mock,另一种是通过ListCollector。在使用Mockito时,非SpringBoot项目需要手动引入依赖


	org.mockito
	mockito-core
	${mockito.version}
	test

// use Mock to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
    // instantiate your function
    IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
    Collector collector = mock(Collector.class);
    // call the methods that you have implemented
    incrementer.flatMap(2, collector);
    //verify collector was called with the right output
    Mockito.verify(collector, times(1)).collect(3);
}

// use ListCollector to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
    // instantiate your function
    IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
    List list = new ArrayList<>();
    ListCollector collector = new ListCollector<>(list);
    // call the methods that you have implemented
    incrementer.flatMap(2, collector);
    //verify collector was called with the right output
    Assert.assertEquals(Collections.singletonList(3), list);
}
对有状态或及时UDF和用户自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项,如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:


    org.apache.flink
    flink-test-utils
    1.15.0
    test

需要注意的是1.15以前版本依赖不同于上述依赖。早期版本添加以下依赖项


    org.apache.flink
    flink-test-utils_${scala.version}
    ${flink.version}
    test

该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。如果想本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项(该依赖是1.15.0版本中添加的,早期版本中没有该依赖):


    org.apache.flink
    flink-table-test-utils
    1.15.0
    test

测试Flink作业
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/874689.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号