Flink 是一款优秀的批处理和流处理的大数据计算引擎,本文将通过Flink的Java Api实现WordCount多版本案例。更多请查阅 Flink官网
说明:
- Flink版本:1.13.5Flink Web UI地址:http://192.168.18.88:7999服务器具备nc(netcat)环境,如果不具备,可在服务器执行安装命令
yum -y install netcat
2 WordCount案例 2.1 项目目录结构
创建maven项目,目录结构如下:
依赖内容如下:
org.apache.flink flink-streaming-java_2.12 1.14.4 org.apache.flink flink-java 1.14.4 org.apache.flink flink-clients_2.12 1.14.4
完整的pom.xml文件如下:
2.3 创建word.txt文件4.0.0 cn.mfox wordcount-demo 1.0-SNAPSHOT wordcount-demo http://www.example.com UTF-8 1.8 1.8 org.apache.flink flink-streaming-java_2.12 1.14.4 org.apache.flink flink-java 1.14.4 org.apache.flink flink-clients_2.12 1.14.4 maven-compiler-plugin 3.6.0 1.8 1.8
创建src/main/java/cn/mfox/word.txt文件,word.txt文件的具体内容如下:
洪七公 王重阳 欧阳锋 黄药师 洪七公 周伯通 郭靖 黄药师 黄蓉 林朝英
word.txt文件位置及内容截图如下:
WordCountByBatch.java代码如下:
package cn.mfox;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountByBatch {
public static void main(String[] args) throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取文件
String inputPath = "src/main/java/cn/mfox/word.txt";
DataSet inputDataSet = env.readTextFile(inputPath);
// 空格分词打散之后,对单词进行groupby分组,然后用sum进行聚合
// 对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
// groupBy(0):按照第一个位置的word分组
// sum(1):将第二个位置上的数据求和
DataSet> wordCountDataSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0)
.sum(1);
// 打印输出
wordCountDataSet.print();
}
private static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
// 分词
String[] words = value.split(" ");
// 遍历所有word,包装成二元组输出
for (String word : words) {
// 排除空格
word = word.trim();
if (!word.isEmpty()) {
out.collect(new Tuple2(word, 1));
}
}
}
}
}
程序运行结果如下:
WordCountByStream.java代码如下:
package cn.mfox;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountBySocketStream {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取ParameterTool
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// 从socket文本流读取数据
DataStream inputDataStream = env.socketTextStream(host, port);
// 数据流转换操作
SingleOutputStreamOperator> outputStreamOperator = inputDataStream
.flatMap(new MyFlatMapper())
.keyBy(value -> value.f0)
.sum(1);
outputStreamOperator.print();
// 启动任务
env.execute();
}
private static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
// 分词
String[] words = value.split(" ");
// 遍历所有word,包装成二元组输出
for (String word : words) {
// 排除空格
word = word.trim();
if (!word.isEmpty()) {
out.collect(new Tuple2(word, 1));
}
}
}
}
}
程序运行结果如下:
- shell窗口中创建socket端口,命令如下
nc -lk 18888
- socket端口中输入测试数据,截图如下:
编写 WordCountBySocketStream.java 文件,代码如下:
package cn.mfox;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountBySocketStream {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取ParameterTool
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// 从socket文本流读取数据
DataStream inputDataStream = env.socketTextStream(host, port);
// 数据流转换操作
SingleOutputStreamOperator> outputStreamOperator = inputDataStream
.flatMap(new MyFlatMapper())
.keyBy(value -> value.f0)
.sum(1);
outputStreamOperator.print();
// 启动任务
env.execute();
}
private static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
// 分词
String[] words = value.split(" ");
// 遍历所有word,包装成二元组输出
for (String word : words) {
// 排除空格
word = word.trim();
if (!word.isEmpty()) {
out.collect(new Tuple2(word, 1));
}
}
}
}
}
- 在当前工程目录下执行maven打包命令
mvn clean package
流程截图如下:
- 在Flink的Web界面(Submit New Job)菜单中上传Jar包,上传成功后的截图如下:
界面点击刚上传的jar包,输入要运行的类、socket的地址、端口,输入的内容如下:
cn.mfox.WordCountBySocketStream --host 192.168.18.88 --port 18888
截图如下:
- 在Running Jobs菜单中可查看刚提交的Flink程序的执行情况,如各个算子的并行度,接受数据的条数大小等,截图如下:
在Task Manages菜单下可以查看Flink程序的日志输出,截图如下:
3 结束语
至此,文档篇幅结束…
因初次接触Flink及个人水平有限,如有错误,欢迎各位大佬点评 !



