栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink流处理示例开发

flink流处理示例开发

flink流处理示例开发
    • 一、版本和开发工具
    • 二、Flink 程序开发步骤
    • 三、开发示例
    • 四、运行

flink处理分流处理(streaming)和批处理(batch)。本示例来展示flink流处理开发的一般步骤和方法。

一、版本和开发工具

flink版本:1.13.3
开发工具:Intellij IDEA
Java版本:1.8.0_261

二、Flink 程序开发步骤

1、获得一个执行环境
2、加载或者创建初始化数据
3、指定操作数据的Transaction算子
4、指定计算好的数据的存放位置
5、调用execute()触发程序执行

三、开发示例

1、创建项目,填写项目名称,存放路径、包名、版本号等

2、添加依赖,在本机注释掉scope,当发布到flink集群时,需要添加scope,因为集群已经存在这些依赖


   8
   8
   UTF-8
   1.13.3
   2.12


   
      org.apache.flink
      flink-java
      ${flink.version}
      
   

   
      org.apache.flink
      flink-streaming-java_${scala.version}
      ${flink.version}
      
  

3、开发需求:通过Socket模拟产生单词,实现每隔1s对最近2s内的数据进行汇总计算
4、java程序结构如下,代码下载地址:Flink开发示例源代码

5、核心代码如下

public static void main(String[] args) throws Exception {
        // 获取socket端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.out.println("没有设置端口号。使用默认端口号9109");
            port = 9109;
        }

        // 获取Flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        String hostname = "node01";
        String delimiter = "n";
        DataStreamSource text = env.socketTextStream(hostname, port, delimiter);

        // timeWindow 已过时,建议使用window
        DataStream wordCounts = text.flatMap(new FlatMapFunction() {
                @Override
                public void flatMap(String s, Collector collector) throws Exception {
                    String[] splits = s.split("\s");
                    for (String word : splits) {
                        collector.collect(new WordCount(word, 1L));
                    }
                }
            }).keyBy(new StreamWordCoutKeySelector())
            .window(SlidingProcessingTimeWindows.of(Time.seconds(2),Time.seconds(1)))
            .sum("count");

        // 把数据打印到控制台并且设置并行度
        wordCounts.print().setParallelism(1);

        // 触发执行程序
        env.execute("Socket window count");
    }
四、运行

1、打开终端,输入命令:nc -l 9109

2、运行SocketWindowWorldCount,会出现异常,直接退出。如下图所示,查看日志有两处错误:
1)是由于依赖中缺少slf4j的依赖,添加依赖slf4j-simple
2)缺少flink执行环境的依赖,添加flink客户端依赖


    org.apache.flink
    flink-clients_${scala.version}
    ${flink.version}
    


   org.slf4j
   slf4j-simple
   1.7.32

3、再次运行,在终端输入单词,可以看到统计的单词个数

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663586.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号