栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink简单示例

flink简单示例

示例1. WordCount

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 创建flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用Source生成DataStream
        DataStreamSource line = env.socketTextStream("linux01", 1993);
        //调用transformation
        SingleOutputStreamOperator> wordAndOne = line.flatMap(new LineSplitor());
        KeyedStream, Tuple> keyed = wordAndOne.keyBy(0);
        SingleOutputStreamOperator> result = keyed.sum(1);
        //调用SinK
        result.print();
        //启动
        env.execute("WordCount");
    }
    //使用内部静态类传参数 
    private static class LineSplitor extends RichFlatMapFunction>{
        @Override
        public void flatMap(String line, Collector> collector) throws Exception {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word,1));
            }
        }
    }
}

示例2.MapFuntion和RichMapFunction的使用

public class MapFunctionDemo {
    public static void main(String[] args) throws Exception {
        //创建flink运行的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用Source生成DataStream
        DataStreamSource line = env.socketTextStream("linux01", 7777);
        //调用transformation
        //SingleOutputStreamOperator upper = line.map(new MyMapFunction());
        SingleOutputStreamOperator upper = line.map(new MyRichFunction());
        //调用Sink
        upper.print();
        //启动
        env.execute("MapFunctionDemo");
    }
    private static class MyRichFunction extends RichMapFunction{
        @Override
        //初始化时执行一次  可以用来初始化数据库的连接
        public void open(Configuration parameters) throws Exception {
            //获取当前这个subtask的编号
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("open 方法被执行了->"+indexOfThisSubtask);
        }

        @Override
        //每条消息执行一次  可以用来使用连接关联数据
        public String map(String value) throws Exception {
            //获取当前这个subtask的编号
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("map方法被执行了->"+indexOfThisSubtask);
            return value.toUpperCase();
        }

        @Override
        //停止Job时执行一次    可以用来关闭连接
        public void close() throws Exception {
            //获取当前这个subtask的编号
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("close方法被执行了->"+indexOfThisSubtask);
        }
    }
    private static class MyMapFunction implements MapFunction{
        @Override
        public String map(String value) throws Exception {
            return value.toUpperCase();
        }
    }
}

 示例3.并行度

public class ParallelismDemo {
    public static void main(String[] args) throws Exception {
        //创建flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //指定运行环境的并行度
        //env.setParallelism(12);
        //获取当前运行环境的并行度
        //在本地执行,运行环境的默认并行度为:当前机器CPU的逻辑核数(cpu线程数)
        int parallelism = env.getParallelism();
        System.out.println("当前运行环境的并行度->"+parallelism);
        //调用Source创建DataStream
        DataStreamSource line = env.socketTextStream("linux01", 7777);
        //查看socketSource的并行度
        //SocketSource即调用socketTextStream创建的DataStream的并行度永远为1
        int parallelism1 = line.getParallelism();
        System.out.println("SocketSource创建的DataStream的并行度->"+parallelism1);
        //map是多并行的算子,如果任务没有指定该算子的并行度,默认与执行环境的并行度一致
        SingleOutputStreamOperator upperStream = line.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
        int parallelism2 = upperStream.getParallelism();
        System.out.println("map算子所在Task的并行度->"+parallelism2);
        //print sink也是多并行的算子,如果任务没有指定该算子的并行度,默认与执行环境的并行度一
        DataStreamSink streamSink = upperStream.print();
        int parallelism3 = streamSink.getTransformation().getParallelism();
        System.out.println("Sink算子所在Task的并行度->"+parallelism3);
        //启动
        env.execute();
    }
}

示例4.自己实现print算子的简单功能

public class MyPrintSink {
    public static void main(String[] args) throws Exception {
        //创建flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用Source生成DataStream
        DataStreamSource line = env.socketTextStream("linux01", 7777);
        //调用transformation
        SingleOutputStreamOperator upperStream = line.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
        //调用自己定义的print
        upperStream.addSink(new MyPrint());
        //启动
        env.execute("MyPrintSink");
    }
    //写一个自己定义的print算子
    private static class MyPrint extends RichSinkFunction{
        @Override
        //该方法每条数据执行一次
        public void invoke(String value, Context context) throws Exception {
            //获取正在运行的上下文对象
            RuntimeContext runtimeContext = getRuntimeContext();
            //获取这个subtask的编号
            int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
            System.out.println(indexOfThisSubtask+1+" > "+value);
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734069.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号