摘要1.map2.flatMap3.filter4.keyBy5.reduce6.window 和aggregate聚合函数7.windowAll8.window 的apply8.window reduce
摘要首先不得不提一点,每一个算子都是有状态的,算子的状态也是flink能够从错误中恢复的基础. 算子的执行状态称为状态后端,状态是可以被程序访问,甚至我们可以自己及写代码访问状态.比如广播就利用了这个特性,首先将流广播出去,然后通过状态句柄去访问广播出去的流.
可以说理解算子状态是学习flink的核心. 状态的存储见我的其他的文章.多说一句, flink运行过程中真正有意义的数据就是状态数据,状态数据就是中间结果. 每个算子operation 计算的中间结果就是状态. 本章只讲解常见的算子operation并不讲解状态,这里之所以说出来是为了提醒读者注意了解flink的状态的意义.
map是对流中的每个T类型元素做处理之后返回新的类型为R元素,然后将R元素组成的流作为新的流往后流动.
下面是scala版本map函数的定义
Creates a new DataStream by applying the given function to every element of this DataStream.(翻译:通过对传入方法中的每个元素做处理,然后返回一个新的流)
def map[R: TypeInformation](fun: T => R): DataStream[R] {...省略详细代码...}
下面说map函数:看上面函数参数的定义fun: T => R,意思是该函数的参数是一个用户传入函数,该函数的参数类型为流中类型为T的元素,经过处理之后返回一个类型为R的元素
例子:ds..map(x=>{
x+1
})
下面是java版本map函数的源码定义:
public2.flatMapSingleOutputStreamOperator map(MapFunction mapper) {......} 该函数的参数是一个MapFunction mapper 接口的实现,点开该接口源码如下: @Public @FunctionalInterface public interface MapFunction extends Function, Serializable { /** * The mapping method. Takes an element from the input data set and transforms it into exactly * one element. * * @param value The input value. * @return The transformed value * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ O map(T value) throws Exception; } java中被@FunctionalInterface注解修饰的接口且该接口只有一个抽象方法,那么表示该接口符合lambda表达式的定义,因此可以简化写成lambda的样式: 下面是例子: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(value -> value.f0) .map( value -> value.getField(0)) .print();
传入一个元素,根据当前传入的单个元素可能会生成一个或者一个以上的元素
java版本 dataStream.flatMap(new FlatMapFunction3.filter() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); scala版本 dataStream.flatMap { str => str.split(" ") }
用自定义的逻辑检测一个元素,如果希望这个元素向下流动就返回true,如果洗碗粉抛弃掉该元素就返回false:
java版本 dataStream.filter(new FilterFunction4.keyBy() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); scala版本 dataStream.filter { _ != 0 }
逻辑上将流划分为不相连的分区,具有相同key的流元素都被分配到相同的分区。不同分区的数据会交给不同的task去执行,底层其实使用了hash分区的方式.
既然是根据hash分区,因此如果key的选择是一个对象且这个对象没有实现自己的hashcode方法,那么个的对象是不能作为key的. 另外任何Array也不能作为key
源码定义:
publicKeyedStream keyBy(KeySelector key) {}
由此可见keyBy方法接收一个KeySelector 的实现类,下面是KeySelector接口的定义
@Public @FunctionalInterface public interface KeySelectorextends Function, Serializable { KEY getKey(IN value) throws Exception; }
例子:
java版本 dataStream.keyBy(new KeySelector5.reduce, String>() { @Override public String getKey(Tuple2 value) throws Exception { return value.getField(0); } }) scala 版本 1. ds.keyBy(new KeySelector[(Long,Long),Long] { override def getKey(value: (Long, Long)): Long = value._1 }) scala 最简单的写法如下: 2. ds.keyBy(x=>x._1) 这种写法和上面一样,但是不推荐了,scala版本方法定义明说了推荐方式: @deprecated("use [[DataStream.keyBy(KeySelector)]] instead") def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*)) 意思即是推荐scala的第一种写法.
对keyBy处理后的数据流做“滚动”操作。将当前元素与最近的做操作,并发出操作后的新值。新的值与下一个值进行同样的操作,然后发出新的值,依次往后计算,直到最后形成的新的数据流是由发出的新的值组成的. 注意reduce只能用于keyBy 之后的数据流. 对于keyBy数据流,相同的key会交给一个线程处理. 所以如果keyBy数据流有多个key, 那么对于reduce而言会有多个不同的线程去独立处理, 处理的结果是根据key独立的. 下面看scala版本的例子:
object Test{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L,1L),
(1L,1L),
(4L, 7L),
(4L, 3L),
(1L, 2L)
)).keyBy(fun = new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}).reduce(new ReduceFunction[(Long, Long)] {
override def reduce(value1: (Long, Long), value2: (Long, Long)): (Long, Long) = (value1._1,value1._2+value2._2)
}).setParallelism(1).writeAsText("D:\flink\a.txt")
// the printed output will be (1,4) and (1,5)
env.execute("ExampleKeyedState")
}
}
输出结果如下:
(1,3)
(1,8)
(1,9)
(1,10)
(1,12)
(4,7)
(4,10)
可以看到结果中key为1 key为2是并行存在的两个独立的结果
下面是reduce函数源码请自己看注释:
/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream
* using an associative reduce function. An independent aggregate is kept per key.注意: kept per key这三个单词
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {...}
下面看看ReduceFunction接口源码:
@Public
@FunctionalInterface
public interface ReduceFunction extends Function, Serializable {
/**
* The core method of ReduceFunction, combining two values into one value of the same type. The
* reduce function is consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
6.window 和aggregate聚合函数
window :注意此函数只用于处理keyBy处理后的键值流数据,应用于窗口函数,每个窗口做一次计算,窗口的计算结果是独立的: 换句话说,window函数后面函数执行逻辑是基于key 独立计算的. 也即是窗口在不同的key上独立计算.
aggregate: aggregate函数用于处理当前window的数据,他有三个方法:
- createAccumulator() 用于初始化累加器(用return返回你创建的累加器)ACC add(IN value, ACC accumulator) value是window窗口的下一条数据,accumulator是你在第一个方法创建的累加器, add方法会返回一个新的累加器,格式和第一个方法创建的累加器要保证一样OUT getResult(ACC accumulator); 这个方法在window窗口触发的时候执行,用于从累加器中获取结果.
关于aggregate请详细看下面的demo:MyAgg
object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// val env = StreamExecutionEnvironment.createRemoteEnvironment("LOCALHOST",8081,"D:\IT\Project\FlinkDemo\target\FlinkDemo-1.0-SNAPSHOT.jar")
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
counts.print()
env.execute("Window 333 WordCount")
}
}
为了理解窗口基于key独立计算的逻辑,下面在看一个java版本的代码:
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 测试AggFunction——求各个班级英语成绩平均分,下面是一个基于元素数量计算的窗口,当窗口检测到两个元素到来的时候就会触发计算.CountTrigger.of(2)意思就是当前key对应的窗口
* 每检测到两个元素就会触发计算
*
*
*/
public class TestAggFunctionOnWindow {
private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据
DataStream> input = env.fromElements(ENGLISH);
// 求各个班级英语成绩平均分
SingleOutputStreamOperator> ds = input.keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(2))).aggregate(new MyAgg());
// ds.print();
ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
env.execute("TestAggFunctionOnWindow");
}
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
Tuple3.of("一班", "王五", 3L),
Tuple3.of("二班", "赵六", 4L),
Tuple3.of("二班", "小七", 5L),
Tuple3.of("二班", "小八", 6L),
};
}
class MyAgg implements AggregateFunction, Tuple3, Tuple2> {
/**
* 创建累加器保存中间状态
* Tupel<班级名称,总分数,总人数>
*/
@Override
public Tuple3 createAccumulator() {
return new Tuple3<>("",0L, 0L);
}
/**
* 将元素添加到累加器并返回新的累加器
*
* @param value 输入类型
* @param acc 累加器ACC类型
*
* @return 返回新的累加器
*/
@Override
public Tuple3 add(Tuple3 value, Tuple3 acc) {
//acc.f0 班级名称
//acc.f1 总分数
//acc.f2 总人数
//value.f0 表示班级 value.f1 表示姓名 value.f2 表示分数
return new Tuple3(value.f0,acc.f1 + value.f2, acc.f2 + 1L);
}
@Override
public Tuple2 getResult(Tuple3 acc) {
return new Tuple2<>(acc.f0,((double) acc.f1) / acc.f2);
}
@Override
public Tuple3 merge(Tuple3 acc1, Tuple3 acc2) {
System.out.println("这个函数不会被执行,只有sessoin窗口函数才会被触发,请忽略此方法");
return new Tuple3<>("",1L,1L);
}
结果如下:
这是我的自定义输出::4> (一班,1.5)
这是我的自定义输出::2> (二班,4.5)
结果分析:
看到没有keyBy 依据班级分成了两个分区, window函数后面的计算逻辑在分区之间是独立计算的. 过程如下:
第一个分区检测到:Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
因为窗口数量为2就会触发索引结果为:(一班,1.5)
第二个分区检测到:Tuple3.of("二班", "赵六", 4L),
Tuple3.of("二班", "小七", 5L),
同理触发窗口计算结果为:(二班,4.5)
有人可能会注意到我在打印结果的时候没有用:ds.print()
而是用了:ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
如果你看到了这里请点挂机print()看源码就会看到:
@PublicEvolving
public DataStreamSink print() {
PrintSinkFunction printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
所以说print()方法,底层调用的还是addSink,上面代码用了new PrintSinkFunction<>(); 通过看源码你会看到:打印输出我们可以自定义前缀的,这样方便我们调试.
7.windowAll
在keyby后数据跟据指定的key被切. 相同的key会被分配到同一个窗口任务中(可理解为独立线程), window后面的清洗逻辑是在独立线程中分别运行的
而调用windowAll之前不需要调用keyBy函数,windowall则把所有的key都聚合起来所以windowall的并行度只能为1,而window可以有多个并行度。
上面说的东西非常重要,如果看不懂的话请停下来.
先看源码:
publicSingleOutputStreamOperator apply(WindowFunction function) { TypeInformation resultType = getWindowFunctionReturnType(function, getInputType()); return apply(function, resultType); } 下面是WindowFunction 接口: /** * base interface for functions that are evaluated over keyed (grouped) windows. * * @param The type of the input value. //流数据元素类型 * @param The type of the output value.//处理完后输出元素的类型 * @param The type of the key.//key 的类型 * @param The type of {@code Window} that this window function can be applied on.//window 的类型, 因为window有很多实现类 */ @Public public interface WindowFunction extends Function, Serializable { /** * evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable input, Collector out) throws Exception; }
apply用于在keyBy, window之后,用于对分区之后的每个key对应的独立处理线程中的每个元素做处理.
下面是一个demo,用于对每个window窗口中:
- 数据中班级拼接一个班级.人数乘以十.
apply什么时候执行?
执行的时候应当是窗口被触发运算的时候
代码:
package com.pg.flink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WindowApply {
private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
public static void main(String[] args) throws Exception {
logger.info("程序开始运行....");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据
DataStream> input = env.fromElements(ENGLISH);
// 求各个班级英语成绩平均分
//KeyedStream, String> keyedStreams= input.keyBy((KeySelector, String>) value -> value.f0);
KeyedStream, String> keyedStreams= input.keyBy(new MyKeySelector());
WindowedStream, String, GlobalWindow> ws = keyedStreams.countWindow(2);
SingleOutputStreamOperator> ds = ws.apply(new MyWindowFunction());
ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
env.execute("TestAggFunctionOnWindow");
}
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
Tuple3.of("一班", "王五", 3L),
Tuple3.of("二班", "赵六", 4L),
Tuple3.of("二班", "小七", 5L),
Tuple3.of("二班", "小八", 6L),
};
public static class MyWindowFunction implements WindowFunction, Tuple3, String, GlobalWindow>{
@Override
public void apply(String s, GlobalWindow window, Iterable> input, Collector> out) throws Exception {
for (Tuple3 e: input) {
out.collect(new Tuple3<>(e.f0+s,e.f1,e.f2*10));
}
}
}
public static class MyKeySelector implements KeySelector, String>{
@Override
public String getKey(Tuple3 value) throws Exception {
return value.f0;
}
}
}
上面代码构造了一个计数窗口基于班级名称做分区,下面数据就两个班级,因此keyBy之后会分成两个独立的窗口处理线程, 二者独立运行. 窗口触发的条件是当前窗口有两个数据的时候.
当窗口触发之后apply用于处理当前窗口的数据. 代码中我们每个班级有三条数据,而窗口的触发是:当窗口遇到两条数据的时候被触发.
代码中keyBy基于班级名称做分流,于是(基于下面的数据)会产生两个独立的窗口处理线程
窗口处理线程一:
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
Tuple3.of("一班", "王五", 3L),
当窗口触发计算的时候(检测到两条数据):调用apply
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
变成:
Tuple3.of("一班一班", "张三", 10L),
Tuple3.of("一班一班", "李四", 20L),
而Tuple3.of("一班", "王五", 3L)被抛弃
窗口处理线程一二:
Tuple3.of("二班", "赵六", 4L),
Tuple3.of("二班", "小七", 5L),
Tuple3.of("二班", "小八", 6L),
当窗口触发计算的时候(检测到两条数据):调用apply
同理结果为:
Tuple3.of("二班二班", "赵六", 40L),
Tuple3.of("二班二班", "小七", 50L),
而Tuple3.of("二班", "小八", 6L),被抛弃
所以最终两个独立的窗口线程的输出结果,也就是程序的最终输出结果:
这是我的自定义输出::2> (二班二班,赵六,40)
这是我的自定义输出::2> (二班二班,小七,50)
这是我的自定义输出::4> (一班一班,张三,10)
这是我的自定义输出::4> (一班一班,李四,20)
注意:当你不用window而是用的windowAll, windowAll意思就是不根据keyBy分区,也就是所有的数据都跑到一个窗口处理,此时调用apply的时候需要用AllWindowFunction而不是WindowFunction ,二者很相似这里不真多windowAll的apply方法多做阐述.
8.window reduce顾名思义针对window窗口数据(以key切分), 当前元素与下一个元素做逻辑将生成的新元素返回, 新的元素和下一个元素做下一轮逻辑,然后将生成的新的元素返回,依次往后…知道当前window被触发. 窗口中的数据流由每次产生的新元素返回.
demo:
package com.pg.flink;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WindowReduceDemo {
private static final Logger logger = LoggerFactory.getLogger(TestAggFunctionOnWindow.class);
public static void main(String[] args) throws Exception {
logger.info("程序开始运行....");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据
DataStream> input = env.fromElements(ENGLISH);
// 求各个班级英语成绩平均分
//KeyedStream, String> keyedStreams= input.keyBy((KeySelector, String>) value -> value.f0);
KeyedStream, String> keyedStreams= input.keyBy(new WindowReduceDemo.MyKeySelector());
WindowedStream, String, GlobalWindow> ws = keyedStreams.countWindow(2);
SingleOutputStreamOperator> ds = ws.reduce(new MyReduce());
ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
env.execute("TestAggFunctionOnWindow");
}
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
Tuple3.of("一班", "王五", 3L),
Tuple3.of("二班", "赵六", 4L),
Tuple3.of("二班", "小七", 5L),
Tuple3.of("二班", "小八", 6L),
};
public static class MyReduce implements ReduceFunction>{
@Override
public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
return Tuple3.of(value1.f0+value2.f0,value1.f1+value2.f1, value1.f2+value2.f2); }
}
public static class MyKeySelector implements KeySelector, String> {
@Override
public String getKey(Tuple3 value) throws Exception {
return value.f0;
}
}
}
//countWindow(2)意思是当前窗口检测到两个元素就会触发计算
结果如下:
这是我的自定义输出::4> (一班一班,张三李四,3)
这是我的自定义输出::2> (二班二班,赵六小七,9)



