map
DataStream → DataStreamflatMap
DataStream → DataStreamfliter
DataStream → DataStreamkeyBy
DataStream → KeyedStream
对数据进行分流reduce
KeyedStream/WindowedStream/AllWindowedStream → DataStream
用于keyBy或者window/windowAll之后window
KeyedStream → WindowedStream
用于keyBy之后windowAll
DataStream → AllWindowedStream
不用于keyBy之后,此算子并行度始终为1apply
WindowedStream/AllWindowedStream → DataStreamunion
DataStream* → DataStream
合并相同类型的流join
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等输出,否则不进行输出。
dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
Interval Join
KeyedStream,KeyedStream → DataStream
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
CoGroup
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等则放在一起输出,否则分开输出。重点是group。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStream
“连接”两条数据流,并保留他们的类型(类型可以不一样)。连接允许两个流之间共享状态。
DataStreamsomeStream = //... DataStream otherStream = //... ConnectedStreams connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap
ConnectedStream → DataStream
专门针对ConnectedStream流的算子
connectedStreams.map(new CoMapFunction() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction () { @Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } });
Iterate
DataStream → IterativeStream → ConnectedStream
一个流被分为两部分,一部分持续不断循环输出,另一部分正常输出。
IterativeStreamiteration = initialStream.iterate(); DataStream iterationBody = iteration.map (); DataStream feedback = iterationBody.filter(new FilterFunction (){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction (){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });



