创建flink初始环境
//配置WEBUI
Configuration configuration=new Configuration();
configuration.setInteger(RestOptions.PORT,8848);
//创建flink执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
environment.setParallelism(4);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream source = environment.addSource(new DataDB());
source.print("source");
DataStream timesSource = environment.addSource(new TimeDB());
timesSource.print("timesource");
Map 接受一个元素并产生一个元素。将输入流的值加倍的映射函数
DataStreammap = source.map(new MapFunction () { @Override public String map(Person person) throws Exception { String log=person.getPname()+","+person.getPage()+","+person.getPsex()+","+person.getPid(); return log; } }); map.print("map");
FlatMap 接受一个元素并产生零个、一个或多个元素。
DataStreamflatMap = map.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { for(String word:s.split(",")){ collector.collect(word); } } }); flatMap.print("flatMap");
Filter 过滤数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉
DataStreamfilter = source.filter(new FilterFunction () { @Override public boolean filter(Person person) throws Exception { return person.getPage() > 18; } }); filter.print("filter");
keyBy 将数据流元素进行逻辑上的分组,具有相同Key的记录将被划分到同一分组。KeyBy()使用Hash Partitioner实现。
KeyedStreamkeyBy = source.keyBy("psex");`
Reduce 按照KeyedStream中的逻辑分组,将当前数据与最后一次的Reduce结果进行合并,合并逻辑由开发者自己实现该类运算应用在KeyedStream上
DataStreamreduce = keyBy.reduce(new ReduceFunction () { @Override public Person reduce(Person person, Person t1) throws Exception { int a = person.getPage() + t1.getPage(); return new Person(person.getPid(), person.getPname(), person.getPsex(), a); } }); reduce.print("reduce");
Fold Fold与Reduce类似,区别在于Fold是一个提供了初始值的Reduce,用初始值进行合并运算
DataStreamfold = keyBy.fold(3, new FoldFunction () { @Override public Integer fold(Integer integer, Person o) throws Exception { return integer + o.getPage(); } }); fold.print("fold");
Aggregation 渐进聚合具有相同Key的数据流元素,以min和minBy为例,min返回的是整个KeyedStream的最小值,minBy按照Key进行分组,返回每个分组的最小值。
keyBy.sum("page").print("sum");
keyBy.min("page").print("min");
keyBy.max("page").print("max");
keyBy.minBy("page").print("minBy");
keyBy.maxBy("page").print("maxBy");
Window 对KeyedStream的数据,按照Key进行时间窗口切分,如每5秒钟一个滚动窗口,每个key都有自己的窗口。
TumblingEventTimeWindows根据事件自带时间,TumblingProcessingTimeWindows根据系统时间
DataStreamaggregate = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .aggregate(new AggregateFunction () { //初始化一个累加器 @Override public Integer createAccumulator() { Integer integer = 0; return integer; } //中间结果,来一条执行一次 @Override public Integer add(Person person, Integer integer) { return person.getPage() + integer; } //在窗口结束的时候执行一次 @Override public String getResult(Integer integer) { return integer.toString(); } //累加结果 @Override public Integer merge(Integer integer, Integer acc1) { return integer + acc1; } }); aggregate.print("aggregate");
WindowAll 对一般的DataStream进行时间窗口切分,即全局1个窗口,如每5秒钟一个滚动窗口。应用在DataStream上
DataStreamwindowAll = source.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5))) .reduce(new ReduceFunction () { @Override public Person reduce(Person person, Person t1) throws Exception { Integer s = person.getPage() + t1.getPage(); return new Person(person.getPid(), person.getPname(), person.getPsex(), s); } }); windowAll.print("windowAll");
Window Apply 将Window函数应用到窗口上,Window函数将一个窗口的数据作为整体进行处理。Window Stream有两种:分组后的WindowedStream和未分组的AllWindowedStream。
DataStreamapply = source.keyBy("psex").countWindow(5) //触发器 按照个数触发计算 .trigger(CountTrigger.of(5)) .apply(new WindowFunction () { @Override public void apply(Tuple tuple, GlobalWindow globalWindow, Iterable iterable, Collector collector) throws Exception { int sum = 0; for (Person person : iterable) { sum += person.getPage(); } collector.collect("" + sum); } }); apply.print("apply");
Window Reduce 在WindowedStream上应用ReduceFunction,输出结果为DataStream
DataStreamreducewindows = source.keyBy("psex").countWindow(5).reduce(new ReduceFunction () { @Override public Person reduce(Person person, Person t1) throws Exception { Integer s = person.getPage() + t1.getPage(); return new Person(person.getPid(), person.getPname(), person.getPsex(), s); } }); reducewindows.print("reducewindows");
Window Fold 在WindowedStream上应用Fold
DataStreamwindowfold = source.keyBy("psex").countWindow(5).fold(5, new FoldFunction () { @Override public Integer fold(Integer integer, Person o) throws Exception { return integer + o.getPage(); } }); windowfold.print("windowfold");
Window Aggregation 在WindowedStream上应用aggregation
SingleOutputStreamOperatorwindowaggregation = source.keyBy("psex").countWindow(5) .aggregate(new AggregateFunction () { //初始化一个累加器 @Override public Integer createAccumulator() { Integer integer = 0; return integer; } //中间结果,来一条执行一次 @Override public Integer add(Person person, Integer integer) { return person.getPage() + integer; } //在窗口结束的时候执行一次 @Override public String getResult(Integer integer) { return integer.toString(); } //累加结果 @Override public Integer merge(Integer integer, Integer acc1) { return integer + acc1; } }); windowaggregation.print("windowaggregation");
Union 把两个或多个DataStream合并,所有DataStream中的元素都会组合成一个新的DataStream但是不去重
DataStreamunion = source.union(keyBy); union.print("union");
Window Join 在相同时间范围的窗口上Join两个DataStream数据流,输出结果为DataStream。
DataStream> join = source.join(timesSource) .where(person -> person.getPid()).equalTo(times -> times.getTid()) .window(TumblingProcessingTimeWindows.of(Time.seconds(3))) .apply(new JoinFunction >() { @Override public Tuple7 join(Person person, Times times) throws Exception { return new Tuple7 (person.getPid(), person.getPname(), person.getPsex(), person.getPage(), times.getTid(), times.getTem(), times.getTimes()); } }); join.print("join");
执行 environment.execute();



