栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink——Transformations的案例

Flink——Transformations的案例

创建flink初始环境

//配置WEBUI
        Configuration configuration=new Configuration();
        configuration.setInteger(RestOptions.PORT,8848);
        //创建flink执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        environment.setParallelism(4);
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream source = environment.addSource(new DataDB());
        source.print("source");
        DataStream timesSource = environment.addSource(new TimeDB());
        timesSource.print("timesource");

Map 接受一个元素并产生一个元素。将输入流的值加倍的映射函数

  DataStream map = source.map(new MapFunction() {
        @Override
        public String map(Person person) throws Exception {
            String log=person.getPname()+","+person.getPage()+","+person.getPsex()+","+person.getPid();
            return log;
        } 
    });
    map.print("map");

FlatMap 接受一个元素并产生零个、一个或多个元素。

  DataStream flatMap = map.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String s, Collector collector) throws Exception {
                for(String word:s.split(",")){
                    collector.collect(word);
                }
            }
        });
        flatMap.print("flatMap");

Filter 过滤数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉

 DataStream filter = source.filter(new FilterFunction() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.getPage() > 18;
            }
        });
        filter.print("filter");

keyBy 将数据流元素进行逻辑上的分组,具有相同Key的记录将被划分到同一分组。KeyBy()使用Hash Partitioner实现。

KeyedStream keyBy = source.keyBy("psex");`

Reduce 按照KeyedStream中的逻辑分组,将当前数据与最后一次的Reduce结果进行合并,合并逻辑由开发者自己实现该类运算应用在KeyedStream上

 DataStream reduce = keyBy.reduce(new ReduceFunction() {
            @Override
            public Person reduce(Person person, Person t1) throws Exception {
                int a = person.getPage() + t1.getPage();
                return new Person(person.getPid(), person.getPname(), person.getPsex(), a);
            }
        });
        reduce.print("reduce");

Fold Fold与Reduce类似,区别在于Fold是一个提供了初始值的Reduce,用初始值进行合并运算

DataStream fold = keyBy.fold(3, new FoldFunction() {
        @Override
        public Integer fold(Integer integer, Person o) throws Exception {
            return integer + o.getPage();
        }
    });
    fold.print("fold");

Aggregation 渐进聚合具有相同Key的数据流元素,以min和minBy为例,min返回的是整个KeyedStream的最小值,minBy按照Key进行分组,返回每个分组的最小值。

		keyBy.sum("page").print("sum");
        keyBy.min("page").print("min");
        keyBy.max("page").print("max");
        keyBy.minBy("page").print("minBy");
        keyBy.maxBy("page").print("maxBy");

Window 对KeyedStream的数据,按照Key进行时间窗口切分,如每5秒钟一个滚动窗口,每个key都有自己的窗口。
TumblingEventTimeWindows根据事件自带时间,TumblingProcessingTimeWindows根据系统时间

DataStream aggregate = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)))
            .aggregate(new AggregateFunction() {
                //初始化一个累加器
                @Override
                public Integer createAccumulator() {
                    Integer integer = 0;
                    return integer;
                }

                //中间结果,来一条执行一次
                @Override
                public Integer add(Person person, Integer integer) {
                    return person.getPage() + integer;
                }

                //在窗口结束的时候执行一次
                @Override
                public String getResult(Integer integer) {
                    return integer.toString();
                }

                //累加结果
                @Override
                public Integer merge(Integer integer, Integer acc1) {
                    return integer + acc1;
                }
            });
    aggregate.print("aggregate");

WindowAll 对一般的DataStream进行时间窗口切分,即全局1个窗口,如每5秒钟一个滚动窗口。应用在DataStream上

 DataStream windowAll = source.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
                .reduce(new ReduceFunction() {
                    @Override
                    public Person reduce(Person person, Person t1) throws Exception {
                        Integer s = person.getPage() + t1.getPage();
                        return new Person(person.getPid(), person.getPname(), person.getPsex(), s);
                    }
                });
        windowAll.print("windowAll");

Window Apply 将Window函数应用到窗口上,Window函数将一个窗口的数据作为整体进行处理。Window Stream有两种:分组后的WindowedStream和未分组的AllWindowedStream。

DataStream apply = source.keyBy("psex").countWindow(5)
        //触发器 按照个数触发计算
        .trigger(CountTrigger.of(5))
        .apply(new WindowFunction() {
            @Override
            public void apply(Tuple tuple, GlobalWindow globalWindow, Iterable iterable, Collector collector) throws Exception {
                int sum = 0;
                for (Person person : iterable) {
                    sum += person.getPage();
                }
                collector.collect("" + sum);
            }
        });
apply.print("apply");

Window Reduce 在WindowedStream上应用ReduceFunction,输出结果为DataStream

DataStream reducewindows = source.keyBy("psex").countWindow(5).reduce(new ReduceFunction() {
    @Override
    public Person reduce(Person person, Person t1) throws Exception {
        Integer s = person.getPage() + t1.getPage();
        return new Person(person.getPid(), person.getPname(), person.getPsex(), s);
    }
});
reducewindows.print("reducewindows");

Window Fold 在WindowedStream上应用Fold

 DataStream windowfold = source.keyBy("psex").countWindow(5).fold(5, new FoldFunction() {
            @Override
            public Integer fold(Integer integer, Person o) throws Exception {
                return integer + o.getPage();
            }
        });
        windowfold.print("windowfold");

Window Aggregation 在WindowedStream上应用aggregation

    SingleOutputStreamOperator windowaggregation = source.keyBy("psex").countWindow(5)
            .aggregate(new AggregateFunction() {
                //初始化一个累加器
                @Override
                public Integer createAccumulator() {
                    Integer integer = 0;
                    return integer;
                }

                //中间结果,来一条执行一次
                @Override
                public Integer add(Person person, Integer integer) {
                    return person.getPage() + integer;
                }

                //在窗口结束的时候执行一次
                @Override
                public String getResult(Integer integer) {
                    return integer.toString();
                }

                //累加结果
                @Override
                public Integer merge(Integer integer, Integer acc1) {
                    return integer + acc1;
                }
            });
   windowaggregation.print("windowaggregation");

Union 把两个或多个DataStream合并,所有DataStream中的元素都会组合成一个新的DataStream但是不去重

DataStream union = source.union(keyBy);
        union.print("union");

Window Join 在相同时间范围的窗口上Join两个DataStream数据流,输出结果为DataStream。

  DataStream> join = source.join(timesSource)
                .where(person -> person.getPid()).equalTo(times -> times.getTid())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
                .apply(new JoinFunction>() {
                    @Override
                    public Tuple7 join(Person person, Times times) throws Exception {
                        return new Tuple7(person.getPid(), person.getPname(), person.getPsex(), person.getPage(), times.getTid(), times.getTem(), times.getTimes());
                    }
                });
        join.print("join");

执行 environment.execute();

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735909.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号