- 首先数据源是流式读取文件内容
- 对每行句子按照空格切分
- 将每个单词都构造为一个Tuple,第一个位置是单词,第二个位置是词频
- 按照key(单词)分组,对每个组做聚合(reduce)操作
- 将结果输出
在maven项目的resources下新建一个文件hello.txt,内容如下:
hello world hello flink hello spark When we have shuffled off this mortal coil When we have shuffled off this mortal coil ack代码如下
package transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountKeyBy {
public static void main(String[] args) throws Exception {
// 1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.从文件中读取数据
DataStream dataStream = env.readTextFile("src/main/resources/hello.txt");
// 执行环境并行度设置3
env.setParallelism(3);
// 3.按照空格分词
DataStream> sensorStream = dataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] wordString = value.split(" ");
for (String wordLine : wordString) {
out.collect(new Tuple2<>(wordLine, 1));
}
}
});
// 4.分组
KeyedStream, Object> key = sensorStream.keyBy(tuple -> tuple.f0);
// 5.聚合
SingleOutputStreamOperator> resultStream = key.sum(1);
resultStream.print();
//执行
env.execute();
}
}
附:pom.xml
4.0.0 org.myorg.quickstart quickstart 0.1 jar Flink Quickstart Job UTF-8 1.13.2 1.8 2.11 ${target.java.version} ${target.java.version} 2.12.1 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-java ${flink.version} compile org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} compile org.apache.flink flink-clients_${scala.binary.version} ${flink.version} compile org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.1.1 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA org.myorg.quickstart.StreamingJob org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.1.1,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile



