下面主要是使用 DataSet 的方式去实现,在 Flink 1.14版本之后,DataSet 的方式被弃用,主要开始使用 DataStream 的方式
1. env 环境准备ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();2. Source 加载数据
用 , 分隔表示两行数据
DataSet3. transformation 数据转换处理lineDS = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?");
这一步是最关键的一部,大致经过4个步骤。
切割、标记、分组、聚合 3.1 切割
关键函数:flatMap,继承 FlatMapFunction,FlatMapFunction有两个参数(String 类型),分别代表输入和输出
DataSet3.2 标记words = lineDS.flatMap(new FlatMapFunction () { @Override public void flatMap(String value, Collector out) throws Exception { String[] arrStr = value.split(" "); // 以空格当做分隔符 for(String s:arrStr){ out.collect(s); } } });
map方法的功能是标记,源代码为
publicMapOperator map(MapFunction mapper) { if (mapper == null) { throw new NullPointerException("Map function must not be null."); } else { String callLocation = Utils.getCallLocationName(); TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType(), callLocation, true); return new MapOperator(this, resultType, (MapFunction)this.clean(mapper), callLocation); } }
MapFunction 是一个接口,其中有唯一一个抽象方法,源代码为
public interface MapFunctionextends Function, Serializable { O map(T var1) throws Exception; }
Tuple2 表示二维元组,Tuple2
下面用匿名函数的方式去实现map方法,完成数据标记的功能
// 数据标记为1,wordAndOne是被标记完成的数据,数据类型为二元组 // MapFunction 方法的功能为数据标记,输入String,输出Tuple2 DataSet3.3 分组> wordAndOne = words.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { // s 表示每一个单词。输入类型String,返回类型Tuple2 return Tuple2.of(s,1); } });
// 分组 -- 对标记完成的数据进行分组。
// 0 表示对Tuple中的第一个参数进行分组
UnsortedGrouping> grouped = wordAndOne.groupBy(0);
3.4 聚合
// 聚合 -- 1 表示对Tuple中的第二个参数进行聚合
AggregateOperator> sum = grouped.sum(1);
4. Sink 数据输出
sum.print();源码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 有两个参数,也就是两行数据
// 父类类型 接收子类对象 是完全可以的
DataSet lineDS = env.fromElements("Who's there?",
"I think I hear them. Stand, ho! Who's there?");
// 数据切割-匿名函数
DataSet words = lineDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] arrStr = value.split(" ");
for(String s:arrStr){
out.collect(s);
}
}
});
// 数据标记为1,wordAndOne是被标记完成的数据
// MapFunction 方法的功能为数据标记,输入String,输出Tuple2
DataSet> wordAndOne = words.map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
// s 表示每一个单词。输入类型String,返回类型Tuple2
return Tuple2.of(s,1);
}
});
// 分组 -- 对标记完成的数据进行分组。
// 0 表示对Tuple中的第一个参数进行分组
UnsortedGrouping> grouped = wordAndOne.groupBy(0);
// 聚合 -- 1 表示对Tuple中的第二个参数进行聚合
AggregateOperator> sum = grouped.sum(1);
try {
sum.print();
} catch (Exception e) {
e.printStackTrace();
}
}
}
依赖
4.0.0 org.example WordCount1.0-SNAPSHOT 1.8 1.8 org.apache.flink flink-java1.13.1 compile log4j log4j1.2.17 runtime org.slf4j slf4j-log4j121.7.33 runtime org.apache.flink flink-clients_2.121.13.1 compile org.slf4j slf4j-api1.7.33 compile org.apache.maven.plugins maven-shade-plugin3.0.0 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA



