- 1. Flink 定义
- Streams
- State
- Time
- API
- 2. Flink 快速上手
- 创建 maven 工程
- wordcount
- 批处理
- 流处理
- 3.Flink DataStream 程序结构
Apache Flink is a framework and distributed process engine for stateful computations over unbounded and bounded data streams.
Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
StreamsFlink 认为,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流。
Bounded streams: 有界流,有始有终的数据集合,数据集合的大小是有限定的。(1TB 的数据)
Unbounded stream:无界流,有始无终的的数据流,数据的多少是不确定的。
State
无状态:数据来一条处理一条,数据无需留在系统中。
有状态:数据或数据的计算结果需要在系统中进行保留。
Time 在我们的传统编码过程中,并没有对于时间做严格的区分,往往是数据来一条便处理一条,但此时所处理的数据并不是数据产生的真实时间。数据需要通过网络进行传输,如果网络时延较大,程序处理距离数据产生的真实时间也就有了较大的时差。
Event Time:事件发生的时间。
Ingestion Time: 事件进入 Flink 的时间。
Process Time:Flink 开始处理事件的时间。
APIFlink 中的 API 分为一下三类:
- 越顶层越抽象,表达含义越简明,使用越方便。
- 越底层越具体,表达能力越丰富,使用越灵活。
wordcount4.0.0 com.tencent FlinkTutorial 1.0-SNAPSHOT org.apache.flink flink-java 1.10.1 org.apache.flink flink-streaming-java_2.12 1.10.1
有如下一个需求,通过 Flink 统计单词出现的个数。
批处理hello.txt 文件内容如下:
hello flink hello spark hello datahub hello java
批处理代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
// 批处理 word count
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
String inputPath = "hello.txt";
DataSet inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet> resultSet = inputDataSet.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有 word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
})
// 按照第一个位置的 word 进行分组
.groupBy(0)
// 将第二个位置上的数据求和
.sum(1);
resultSet.print();
}
}
流处理
流处理代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
String inputPath = "hello.txt";
DataStream inputDataStream = env.readTextFile(inputPath);
// 基于数据流进行转换计算
DataStream resultStream = inputDataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有 word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
})
// 按照第一个位置的 word 进行分组
.keyBy(0)
// 将第二个位置上的数据求和
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
}
可以看到,相较于批处理而言,流处理的输出在变化,每来一个单词,都在之前单词的基础上进行了一次重新计算。
上述流处理的代码是从文件中读取数据,为了更加直观的体验流处理,我们通过 Socket 的方式读取内容,这样就更加符合流处理的场景了,不确定数据何时到以及数据的多少。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 用 ParameterTool 工具从程序启动参数中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStream inputDataStream = env.socketTextStream(host, port);
// 基于数据流进行转换计算
DataStream resultStream = inputDataStream.flatMap(new FlatMapFunction>() {
public void flatMap(String value, Collector> out) throws Exception {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有 word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
})
// 按照第一个位置的 word 进行分组
.keyBy(0)
// 将第二个位置上的数据求和
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
}
在 IDE 中配置启动参数,如下:
--host localhost --port 7777
首先在终端中监听 7777 端口,然后启动程序。
接着通过终端发送消息,观察输出。
可以看到,已经有输出了,我们接着输入。
程序已经成功输出了每个单词出现的次数。
这里你可能有一个疑问,就是单词前面出现的数字是啥。这是因为我们知道 Flink 是分布式的,即有多个节点并行运行,这些数字是这节点的编号。
我们可以通过 env.setParallelism(1); 将并行度设置为 1,这样就不会有多个节点运行。
重新运行程序,可以看到不再有节点的标示数字了。
通过上述 Flink 的体验,可以得知 Flink DataStream 程序结构主要分为以下几个步骤:
1、设置运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、配置数据源读取数据
DataStreamtext = env.readTextFile("input");
3、进行一系列转换
DataStream> counts = text.flatMap(...).keyBy(0).sum(1);
4、配置数据汇写出数据
counts.writerAsText("output");
5、提交执行(一定别忘记执行,否则程序并不会执行)
env.execute("Streaming WordCount");



