flink作为当前最火实时大数据框架,也想阅读其源码,并实战一下它
2、具体步骤 2.1 环境准备jdk1.8+ 即可 ,因为flink 大部分是Java编写的
2.2 创建idea项目和普通的创建maven java项目一样,没有区别
2.3 pom.xml配置2.4 批程序示例4.0.0 org.example FirstFlink 1.0-SNAPSHOT org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.flink flink-java 1.11.1 org.apache.flink flink-streaming-java_2.12 1.11.1 provided org.apache.flink flink-clients_2.12 1.11.1 org.apache.flink flink-scala_2.12 1.11.1 org.apache.flink flink-streaming-scala_2.12 1.11.1 provided
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountBatch {
public static void main(String[] args) throws Exception {
// 第一个参数为输入路径,第二个参数为输出路径
String inPath = "E:\IdeaProjects\FirstFlink\data\input\hello.txt";
String outPath = "E:\IdeaProjects\FirstFlink\data\output\output.txt";
// 获取Flink批处理执行环境
ExecutionEnvironment executionEnvironment =
ExecutionEnvironment.getExecutionEnvironment();
// 获取文件中内容
DataSet text = executionEnvironment.readTextFile(inPath);
// 对数据进行处理
DataSet> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
dataSet.writeAsCsv(outPath,"n"," ").setParallelism(1);
// 触发执行程序
executionEnvironment.execute("wordcount batch process");
}
static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector>
collector) throws Exception {
for (String word:line.split(" ")) {
collector.collect(new Tuple2(word,1));
}
}
}
}
hello.txt
hello flink hello zk hello spark
输出结果:output.txt
zk 1 flink 1 hello 3 spark 12.5 流程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStream {
public static void main(String[] args) throws Exception {
// 获取Flink流执行环境
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
// 获取socket输入数据
DataStreamSource textStream =
streamExecutionEnvironment.socketTextStream("hadoop2", 7777, "n");
SingleOutputStreamOperator>
sum = textStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector>
collector) throws Exception {
String[] splits = s.split("\s");
for (String word : splits) {
collector.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(0).sum(1);
// 打印数据
sum.print();
// 触发任务执行
streamExecutionEnvironment.execute("wordcount stream process");
}
}
1)在hadoop2机器上,先执行
# 启动7777端口 nc -lp 7777
2)启动 WordCountStream 的main函数
3)在hadoop2 nc命令后,接着输入
hello flink hello flink hello hello hello spark stream flink hello
4)idea控制台输出
5> (hello,1) 13> (flink,1) 5> (hello,2) 5> (hello,3) 13> (flink,2) 5> (hello,4) 13> (flink,3) 5> (hello,5) 1> (spark,1) 5> (hello,6) 16> (stream,1)
3、总结改进注意事项:第1次执行流程序会报错,勾选如下配置即可
- idea 创建flink java 批/流处理程序,只需要添加对应的依赖即可,flink-java / flink-streaming-java_2.12 ,此外还需要额外加入flink-clients_2.12
- 创建scala版程序和Java版本类似



