Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子实现了FilterFunction接口:
DataStreamflinkTweets = tweets.filter(new FlinkFilter()); public static class FlinkFilter implements FilterFunction { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } }
还可以将函数实现成匿名类
DataStreamflinkTweets = tweets.filter( new FilterFunction () { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } } );
我们filter的字符串"flink"还可以当作参数传进去。
DataStream5.5.2 匿名函数(Lambda Functions)tweets = env.readTextFile("INPUT_FILE "); DataStream flinkTweets = tweets.filter(new KeyWordFilter("flink")); public static class KeyWordFilter implements FilterFunction { private String keyWord; KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(this.keyWord); } }
DataStreamtweets = env.readTextFile("INPUT_FILE"); DataStream flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );



