学习 Flink(十八):单元测试

更新至 Flink 1.9.0 版本

Flink 提供了基于 JUnit 的单元测试工具库 flink-test-utils。

依赖

编辑 pom.xml 文件,添加依赖:

<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.9.0</version>
  <scope>test</scope>
</dependency>  
<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>1.9.0</version>
  <scope>test</scope>
  <classifier>tests</classifier>
</dependency>  
<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
  <scope>test</scope>
  <classifier>tests</classifier>
</dependency>  

注意:由于需要测试 JAR 包org.apache.flink:flink-runtime_2.11:tests:1.9.0org.apache.flink:flink-streaming-java_2.11:tests:1.9.0,依赖需要制定 classifier 为 tests。

Function 单元测试

对于无状态时间无关的 Function,给定输入,验证输出,即可。

定义偶数过滤 Function:

public class EvenNumberFilterFunction implements FilterFunction<Long> {

    @Override
    public boolean filter(Long input) throws Exception {
        return input % 2 == 0L;
    }

}

编写单元测试:

public class EvenNumberFilterFunctionTest {

    @Test
    public void testEvenNumber() throws Exception {
        EvenNumberFilterFunction filter = new EvenNumberFilterFunction();

        assertFalse(filter.filter(1L));
        assertTrue(filter.filter(2L));
    }

}

对于有状态时间相关的 Function,由于需要依赖 Flink 运行时,情况更复杂。好在 Flink 提供了 Test Harnesses 测试这类 Function:

  • OneInputStreamOperatorTestHarness 用于测试 DataStream 算子;
  • KeyedOneInputStreamOperatorTestHarness 用于测试 KeyedStream 算子;
  • TwoInputStreamOperatorTestHarness 用于测试两个 DataStream 的 ConnectedStreams 算子;
  • KeyedTwoInputStreamOperatorTestHarness 用于测试两个 KeyedStream 的 ConnectedStreams 算子;

定义计数 Function:

public class CountFunction extends KeyedProcessFunction<String, String, Long> {

    private ValueState<Long> countValueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor<Long> countVSD = new ValueStateDescriptor<>(
                "count",
                Types.LONG
        );
        this.countValueState = getRuntimeContext().getState(countVSD);
    }

    @Override
    public void processElement(String key, Context context, Collector<Long> collector) throws Exception {
        long count = 0;
        if (this.countValueState.value() != null) {
            count = this.countValueState.value();
        }

        count++;

        this.countValueState.update(count);

        collector.collect(count);
    }

}

编写单元测试:

public class CountFunctionTest {

    private KeyedOneInputStreamOperatorTestHarness<String, String, Long> testHarness;

    private CountFunction countFunction;

    @Before
    public void setupTestHarness() throws Exception {
        // ①
        this.countFunction = new CountFunction();
        this.testHarness = new KeyedOneInputStreamOperatorTestHarness<String, String, Long>(
                new KeyedProcessOperator(this.countFunction),
                (KeySelector<String, String>) input -> input,
                Types.STRING
        );

        // ②
        this.testHarness.open();
    }

    @Test
    public void testSumFunction() throws Exception {
        // ③
        testHarness.processElement("a", 100L);
        testHarness.processElement("a", 101L);
        testHarness.processElement("a", 102L);

        // ④
        testHarness.getOutput();
    }

}

① 待测试 Function 创建 TestHarness;

② 调用 TestHarness 的 open() 方法,创建内部状态;

③ 测试数据输入;

④ 获取输出。

Job 单元测试

Flink 提供了用于测试一个完成 Job 的本地集群环境。

public class IntegrationTest {

     // ①
     @ClassRule
     public static MiniClusterWithClientResource flinkCluster =
         new MiniClusterWithClientResource(
             new MiniClusterResourceConfiguration.Builder()
                 .setNumberSlotsPerTaskManager(2)
                 .setNumberTaskManagers(1)
                 .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ②

        env.execute();
    }

}

① 创建 MiniClusterWithClientResource 实例,创建一个本地集群运行 Job;

② 执行 Job;

参考