栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

IDEA本地运行Flink-java版

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

IDEA本地运行Flink-java版

1、背景

flink作为当前最火实时大数据框架,也想阅读其源码,并实战一下它

2、具体步骤 2.1 环境准备

jdk1.8+ 即可 ,因为flink 大部分是Java编写的

2.2 创建idea项目

和普通的创建maven java项目一样,没有区别

2.3 pom.xml配置


    4.0.0

    org.example
    FirstFlink
    1.0-SNAPSHOT
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    

    
        
        
            org.apache.flink
            flink-java
            1.11.1
        
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.11.1
            provided
        
        
        
            org.apache.flink
            flink-clients_2.12
            1.11.1
        
        
        
        
            org.apache.flink
            flink-scala_2.12
            1.11.1
        
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.11.1
            provided
        
    


2.4 批程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountBatch {
    public static void main(String[] args) throws Exception {
        // 第一个参数为输入路径,第二个参数为输出路径
        String inPath = "E:\IdeaProjects\FirstFlink\data\input\hello.txt";
        String outPath = "E:\IdeaProjects\FirstFlink\data\output\output.txt";
        // 获取Flink批处理执行环境
        ExecutionEnvironment executionEnvironment =
                ExecutionEnvironment.getExecutionEnvironment();
        // 获取文件中内容
        DataSet text = executionEnvironment.readTextFile(inPath);
        // 对数据进行处理
        DataSet> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
        dataSet.writeAsCsv(outPath,"n"," ").setParallelism(1);
        // 触发执行程序
        executionEnvironment.execute("wordcount batch process");
    }
    static class LineSplitter implements FlatMapFunction> {
        @Override
        public void flatMap(String line, Collector>
                collector) throws Exception {
            for (String word:line.split(" ")) {
                collector.collect(new Tuple2(word,1));
            }
        }
    }
}

hello.txt

hello flink
hello zk
hello spark

输出结果:output.txt

zk 1
flink 1
hello 3
spark 1

2.5 流程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WordCountStream {
    public static void main(String[] args) throws Exception {
        // 获取Flink流执行环境
        StreamExecutionEnvironment streamExecutionEnvironment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取socket输入数据
        DataStreamSource textStream =
                streamExecutionEnvironment.socketTextStream("hadoop2", 7777, "n");
        SingleOutputStreamOperator>
                sum = textStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String s, Collector>
                    collector) throws Exception {
                String[] splits = s.split("\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(0).sum(1);
        // 打印数据
        sum.print();
        // 触发任务执行
        streamExecutionEnvironment.execute("wordcount stream process");
    }
}

1)在hadoop2机器上,先执行

# 启动7777端口
nc -lp 7777

2)启动 WordCountStream 的main函数
3)在hadoop2 nc命令后,接着输入

hello flink hello flink hello hello
hello spark stream flink hello

4)idea控制台输出

5> (hello,1)
13> (flink,1)
5> (hello,2)
5> (hello,3)
13> (flink,2)
5> (hello,4)
13> (flink,3)
5> (hello,5)
1> (spark,1)
5> (hello,6)
16> (stream,1)

注意事项:第1次执行流程序会报错,勾选如下配置即可

3、总结改进
  1. idea 创建flink java 批/流处理程序,只需要添加对应的依赖即可,flink-java / flink-streaming-java_2.12 ,此外还需要额外加入flink-clients_2.12
  2. 创建scala版程序和Java版本类似
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/859545.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号