栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink练习第三天:转换算子练习--map、flatmap、filter、reduce、simpleagg

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink练习第三天:转换算子练习--map、flatmap、filter、reduce、simpleagg

目录

map

flatmap

filter

reduce

simpleagg

map

map
package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L), new Event("Tom", "./prod", 3000L));

        //匿名类实现mapfunction接口
        SingleOutputStreamOperator map1 = stream.map(new MapFunction() {
            @Override
            public String map(Event event) throws Exception {
                return event.user;
            }
        });

        map1.print();


        //lambda表达式
        SingleOutputStreamOperator map2 = stream.map(data -> data.user);
        map2.print();

        env.execute();


    }
}

flatmap
package com.atguigu.chapter05;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class TransformFlatMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L), new Event("Tom", "./prod", 3000L));


        stream.flatMap((Event value, Collector out) ->{
            if(value.user.equals("Mary")){
                out.collect(value.url);
            }
            else if(value.user.equals("Bob")){
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());
            }
        }).returns(new TypeHint() {

        }).print();

        env.execute ();


    }
}

filter
package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformFilterTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L), new Event("Tom", "./prod", 3000L));

            stream.filter(data -> data.user.equals("Mary")).print("Mary click");

            env.execute();


    }
}

reduce
package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 3000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./home", 1000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./prof", 4400L),
                new Event("Tom", "./prod", 5000L));

        //统计每个用户的访问频次
        SingleOutputStreamOperator> clickByUser = stream.map(new MapFunction>() {
                    @Override
                    public Tuple2 map(Event value) throws Exception {
                        return Tuple2.of(value.user, 1L);
                    }
                }).keyBy(data -> data.f0)
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                });

        //根据个数选取最活跃用户
        SingleOutputStreamOperator> result = clickByUser.keyBy(data -> "key")
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                });

        result.print();
        env.execute();


    }
}

simpleagg
package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformSimpleAggTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 3000L),
                new Event("Bob", "./home", 1000L),
                new Event("Bob", "./prof", 4400L),
                new Event("Tom", "./home", 3000L),
                new Event("Tom", "./prod", 5000L));


        stream.keyBy(data ->data.user)
                .maxBy("timestamp")
                .print();

        stream.keyBy(data ->data.user)
                .max("timestamp")
                .print();



        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863571.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号