Flink批处理:统计一个文件中各个单词出现的次数,把统计结果输出到文件。
新建maven项目
- 引入依赖
批处理快速应用org.apache.flink flink-clients 1.15.1 org.apache.flink flink-java 1.15.1 org.apache.flink flink-streaming-java 1.15.1 compile
package com.tong;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountBatch {
public static void main(String[] args) throws Exception {
//定义输入输出路径
String input = "D:\in\123.txt";
String output = "D:\out\123.csv";
//1.获取flink的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.用flink的运行环境,去获取待分析数据
DataSource textValue = env.readTextFile(input);
//3.处理数据,使用flatMap方法进行聚合处理
DataSet> dataSet = textValue.flatMap(new SplitValue())
//b.相同单词聚合到一起,0指的是Tuple2的String位置,如(hello,1)中的hello的位置0
.groupBy(0)
//c.聚合到的数据累加处理,累加处理的的数据,即就是(hello,1)中的1的位置
.sum(1);
//4.保存处理结果,写入到csv中
dataSet.writeAsCsv(output,"n"," ").setParallelism(1);
//5.触发执行程序
env.execute("wordCount batch process");
}
static class SplitValue implements FlatMapFunction> {
//a.将文本内容打散成一个一个单词
@Override
public void flatMap(String line, Collector> collector) throws Exception {
//数据按照空格进行切分
String[] words = line.split(" ");
for (String word : words) {
//将单词拼接成(hello,1)这种样式,方便之后就行聚合计算
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
执行完后的结果如下:
package com.tong;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStream {
public static void main(String[] args) throws Exception {
String ip="127.0.0.1";
int port = 7777;
//获取流式数据的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource textStream = env.socketTextStream(ip, port);
SingleOutputStreamOperator> dataStream = textStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
//数据按照空格进行切分
String[] words = line.split(" ");
for (String word : words) {
//将单词拼接成(hello,1)这种样式,方便之后就行聚合计算
collector.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator> word = dataStream.keyBy(0).sum(1);
word.print();
env.execute("wordCount stream process");
}
}
流处理模仿小工具
netcat(nc)下载地址:https://eternallybored.org/misc/netcat/
启动使用:https://blog.csdn.net/q_a_z_w_s_x___/article/details/115327163
运行结果如下



