业务需求:统计下列单词并打印输出
hadoop spark flink hadoop spark flink hadoop spark flink hadoop spark flink hadoop spark flink hadoop spark flink1、代码实现
package flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount0 {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2、读取 文件数据 数据
DataStreamSource inputDataStream = env.readTextFile("H:\flink_demo\flink_test\src\main\resources\wordcount.txt");
// 3、计算
SingleOutputStreamOperator> resultDataStream = inputDataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String input, Collector> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.sum(1);
// 4、输出
resultDataStream.print();
// 5、启动 env
env.execute();
}
}
运行结果
2、优化点一 - 使用面向对象优化点:把数据看成对象,遇到字段较多的数据操作比较方便 2.1、自定义对象数据结构
public class WordAndCount {
private String word;
private int count;
public WordAndCount() {
}
public WordAndCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
2.2、main方法实现业务逻辑
public class WordCount {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2、读取数据
DataStreamSource inputDataStream = env.readTextFile("H:\flink_demo\flink_test\src\main\resources\wordcount.txt");
// 3、扁平化 + 分组 + sum
SingleOutputStreamOperator resultData = inputDataStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String line, Collector out) throws Exception {
String[] fields = line.split(" ");
for (String word : fields) {
out.collect(new WordAndCount(word, 1));
}
}
}).keyBy("word").sum("count");
resultData.print();
// 4、启动 env
env.execute();
}
}
运行结果
3、优化点二 - 抽取业务功能优化:业务逻辑核心算子单独实现,代码便于阅读 3.1、自定义对象的数据结构
public class WordAndCount {
private String word;
private int count;
public WordAndCount() {
}
public WordAndCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
3.2、抽取业务逻辑
public static class SplitLine implements FlatMapFunction3.3、main方法实现{ @Override public void flatMap(String line, Collector out) throws Exception { String[] fields = line.split(" "); for (String word : fields) { out.collect(new WordAndCount(word, 1)); } } }
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2、读取数据
DataStreamSource inputDataStream = env.readTextFile("H:\flink_demo\flink_test\src\main\resources\wordcount.txt");
// 3、扁平化 + 分组 + sum
SingleOutputStreamOperator resultData = inputDataStream.flatMap(new SplitLine()).keyBy("word").sum("count");
// 4、打印输出
resultData.print();
// 5、启动 env
env.execute();
}
运行结果
4、优化点三 - 数据源传参优化点:flink建议如果程序中需要传入参数,使用它提供的ParameterTool。 4.1、自定义对象的数据结构
public class WordAndCount {
private String word;
private int count;
public WordAndCount() {
}
public WordAndCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
4.2、抽取业务逻辑
public static class SplitLine implements FlatMapFunction4.3、main方法实现自定义参数传递{ @Override public void flatMap(String line, Collector out) throws Exception { String[] fields = line.split(" "); for (String word : fields) { out.collect(new WordAndCount(word, 1)); } } }
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2、读取数据
//flink提供的工具类,获取传递的参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String path = parameterTool.get("path");
DataStreamSource dataStream = env.readTextFile(path);
// 3、扁平化 + 分组 + sum
SingleOutputStreamOperator resultData = dataStream.flatMap(new SplitLine()).keyBy("word").sum("count");
// 4、打印输出
resultData.print();
// 5、启动 env
env.execute();
}
参数传递
运行结果
5、生产环境最佳代码实践 5.1、pom文件配置5.2、自定义对象的数据结构4.0.0 org.example flinkdemo 1.0-SNAPSHOT 1.8 2.11 1.9.3 1.10.0 2.7.3 1.2.72 2.9.0 5.1.35 1.2.17 1.7.7 1.8 1.8 1.8 UTF-8 compile com.hainiu.Driver org.slf4j slf4j-log4j12 ${slf4j.version} ${project.build.scope} log4j log4j ${log4j.version} ${project.build.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${project.build.scope} org.apache.flink flink-hadoop-compatibility_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-java ${flink.version} ${project.build.scope} org.apache.flink flink-streaming-java_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-scala_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-runtime-web_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-statebackend-rocksdb_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-hbase_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-connector-elasticsearch5_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-connector-kafka-0.10_${scala.version} ${flink.version} ${project.build.scope} org.apache.flink flink-connector-filesystem_${scala.version} ${flink.version} ${project.build.scope} mysql mysql-connector-java ${mysql.version} ${project.build.scope} redis.clients jedis ${redis.version} ${project.build.scope} org.apache.parquet parquet-avro ${parquet.version} ${project.build.scope} org.apache.parquet parquet-hadoop ${parquet.version} ${project.build.scope} org.apache.flink flink-parquet_${scala.version} ${flink.version} ${project.build.scope} com.alibaba fastjson ${fastjson.version} ${project.build.scope} src/main/resources org.apache.maven.plugins maven-assembly-plugin src/assembly/assembly.xml ${mainClass} make-assembly package single org.apache.maven.plugins maven-surefire-plugin 2.12 true once **/** org.apache.maven.plugins maven-compiler-plugin 3.1 ${java.version} ${java.version} ${project.build.sourceEncoding}
package flink.demo;
public class WordAndCount {
private String word;
private int count;
public WordAndCount() {
}
public WordAndCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
5.3、入口类实现
package flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2、读取数据
//flink提供的工具类,获取传递的参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String path = parameterTool.get("path");
DataStreamSource dataStream = env.readTextFile(path);
// 3、扁平化 + 分组 + sum
SingleOutputStreamOperator resultData = dataStream.flatMap(new SplitLine()).keyBy("word").sum("count");
// 4、打印输出
resultData.print();
// 5、启动 env
env.execute();
}
// 业务逻辑抽离:核心算子单独实现
public static class SplitLine implements FlatMapFunction{
@Override
public void flatMap(String line, Collector out) throws Exception {
String[] fields = line.split(" ");
for (String word : fields) {
out.collect(new WordAndCount(word, 1));
}
}
}
}
5.4、代码目录结构



