maven项目的文本文件与pom.xml配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120555968
同样以wordcount为例
package transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountKeyBy {
public static void main(String[] args) throws Exception {
// 1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.从文件中读取数据
DataStream dataStream = env.readTextFile("src/main/resources/hello.txt");
// 执行环境并行度设置3
env.setParallelism(3);
// 3.按照空格分词
DataStream> sensorStream = dataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] wordString = value.split(" ");
for (String wordLine : wordString) {
out.collect(new Tuple2<>(wordLine, 1));
}
}
});
// 4.分组
KeyedStream, Object> key = sensorStream.keyBy(tuple -> tuple.f0);
// 5.聚合
SingleOutputStreamOperator> resultStream = key.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return new Tuple2(value1.f0, value1.f0.length() + value1.f1);
}
});
resultStream.print();
//执行
env.execute();
}
}
在上面的第5步为自定义的聚合操作,其中:reduce(Tuple2
上面的代码中Tuple第一个位置返回原有的单词,而Tuple第二个位置每次都加一次当前单词的长度



