- 一 、Flink入门
- 1.1 Flink简介
- 1.1.1 处理无界和有界数据
- 1.1.2 部署应用到任意地方
- 1.1.3 运行任意规模应用
- 1.1.4 利用内存性能
- 1.2 Flink架构图
- 1.3 入门案例演示
- 1.3.1 pom文件
- 1.3.2 单词计数案例演示
- 1.3.3 使用面向对象
- 1.3.4 使用最佳实践
- 1.3.5 抽离业务功能
- 1.4 Flink核心概念
- 1.4.1 Flink概念
- 1.4.2 本地观察Flink任务
- 1.4.3 并行度
- 1.4.4 数据传输策略
- 1.4.5 Operator Chain
- 1.5 Flink分布式运行环境
- 1.5.1 flink分布式四层模型
- 1.5.2 Flink任务分布式运行流程
- 二、DataStream API
- 2.1 Flink之数据源
- 2.1.1 source简介
- 2.1.2 数据源之collection
- 2.1.3 自定义单并行度数据源
- 2.1.4 自定义多并行度数据源
- 2.2 常见Transformation操作
- 2.2.1 map和filter
- 2.2.2 flatMap,keyBy和sum
- 2.2.3 union
- 2.2.4 connect, conMap和conFlatMap
- 2.2.5 Split和Select
- 2.3 常见sink操作
- 2.3.1 print() / printToErr()
- 2.3.2 writeAsText()
- 2.3.3 Flink提供的sink
- 三、DataSet API
- 3.1 source
- 3.2 transform
- 3.2.1. 算子概览
- 3.2.3. MapPartition
- 3.2.4. distinct
- 3.2.5. join
- 3.2.6. OutJoin
- 3.2.7. Cross
- 3.2.8. First-n 和 SortPartition
- 3.2.9. partition
- 3.3 sink
- 3.4 Flink之广播变量
- 3.5 Flink之Counter(计数器)
- 下载Flink版本 https://mirror.bit.edu.cn/apache/flink/flink-1.10.2/flink-1.10.2-bin-scala_2.11.tgz
Apache Flink® — Stateful Computations over Data Streams
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
Spark 很多技术参考了 Flink, 那为什么Spark更流行?
- Spark有一个好的产品经理
- Spark官方网站、Spark的任务界面、Spark Shell
Spark:顶层设计,认为 所有的数据都是批处理。
如果数据量很大,处理速度很慢 ---> 离线处理任务
数据量比较小,处理速度很快 ---> 微批处理 ,近实时处理任务 Sparking streaming
Flink:顶层设计,认为 所有的数据都是流。
流是源源不断的,无界 ---> 实时处理任务
10分钟,1小时,有界 ---> 离线处理任务
1.1.1 处理无界和有界数据
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
数据可以被作为 无界 或者 有界 流来处理。
- 无界流(实时流,实时的程序)
- 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。
- 无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
- 有界流
- 有定义流的开始,也有定义流的结束。
- 有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。
Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得Flink的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
1.1.2 部署应用到任意地方Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。
Flink集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行(standalone)。
Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resourcemanager-specific)的部署模式实现的。Flink可以采用与当前资源管理器相适应的方式进行交互。
部署 Flink 应用程序时,Flink会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST调用进行的,这可以简化Flink与各种环境中的集成。
1.1.3 运行任意规模应用Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。
Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字:
- 每天处理数万亿的事件
- 可以维护几TB大小的状态
- 可以部署上千个节点的集群
有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。
1.2 Flink架构图 1.3 入门案例演示 1.3.1 pom文件- 官网建议使用IDEA,集成Scala和Maven比较方便
- pom.xml文件指定
1.3.2 单词计数案例演示1.10.1 2.11.8 1.8 1.8 org.apache.flink flink-streaming-java_2.11 ${flink.version}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 获取数据源
DataStreamSource dataStream = env.socketTextStream("192.168.8.110", 9999);
// 3. 数据处理
SingleOutputStreamOperator> wordAndOne = dataStream.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator> wordCount = wordAndOne.keyBy(0).sum(1);
// 4. 数据输出
wordCount.print();
// 5. 启动任务
env.execute("word count ....");
}
}
1.3.3 使用面向对象
把数据看成对象,遇到字段较多的数据操作比较方便
public class WordCount {
public static void main(String[] args) throws Exception {
// 1 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2 获取数据源
DataStreamSource dataStream = env.socketTextStream("192.168.152.102", 9999);
// 3 数据处理
SingleOutputStreamOperator wordCount = dataStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(new WordAndCount(word, 1));
}
}
}).keyBy("word")
.sum("count");
// 4 数据输出
wordCount.print();
// 5 启动任务
env.execute("word count ...");
}
public class WordAndCount {
private String word;
private int count;
// getter, setter, toString, 无参构造,有参构造
}
}
- 先使用命令启动 netcat网络服务端
- nc -l -p 9999
- 再启动代码
flink建议如果程序中需要传入参数,使用它提供的ParameterTool
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 获取数据源
// 使用 flink提供的工具类,获取传递的参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
DataStreamSource dataStream = env.socketTextStream(hostname, port);
// 3. 数据处理
// ...
}
}
1.3.5 抽离业务功能
工作中开发复杂功能模块,习惯把业务算子抽离出来单独开发,这样代码结构会比较清晰
/// 分割单词 public class SplitWord implements FlatMapFunction{ @Override public void flatMap(String line, Collector collector) throws Exception { String[] fields = line.split(","); for (String word : fields) { collector.collect(new WordAndCount(word, 1)); } } }
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 获取数据源
// 使用 flink提供的工具类,获取传递的参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
DataStreamSource dataStream = env.socketTextStream(hostname, port);
// 3. 数据处理
SingleOutputStreamOperator wordCount = dataStream
.flatMap(new SplitWord())
.keyBy("word")
.sum("count");
// 4. 数据输出
wordCount.print();
// 5. 启动任务
env.execute("word count ....");
}
}
1.4 Flink核心概念
1.4.1 Flink概念
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams
- 核心概念之stateful
- flink: 有状态的流
- SparkStreaming: 没有状态
- 2秒运行一次: hadoop,hadoop,hive --> hadoop2,hive1
- 2秒运行一次: hadoop,hive,hive --> hadoop1,hive2
- Sparktreaming 有2个算子是有状态的
- mapWithState, updateStateByKey
- 里面的状态使用起来不是很灵活
- 核心概念之Operator
- source
- transfrom
- sink
- 核心概念之distributed
- 每个算子都可以并行
- pom.xml中添加如下依赖
org.apache.flink flink-runtime-web_2.11 ${flink.version}
- 修改获取执行环境的代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- 运行程序, 程序运行起来打开 http://localhost:8081/
为什么没有设置并行度,wordcount的并行度是8?
- 电脑cpu是8核,所以并行度是8
- source是socket, 只有1个端口。所以source的并行度是1。
- 设置全局并行度为2
env.setParallelism(2);
- 设置sink Operator并行度
wordCount.print().setParallelism(1);
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gygeg7eJ-1633917012318)(assets/C1142570B79F477D832DC652CEB38CCC.png)]
- Flink架构
Flink的架构是主从式的架构, 主节点叫:JobManager,从节点叫 TaskManager 。
3个节点部署Flink集群。
- 任务分布式运行
- Flink分布式任务调度
- forward strategy
- 一个 task 的输出只发送给一个 task 作为输入;
- 如果两个 task 都在一个 JVM 中的话,那么就可以避免网络开销;
- key based strategy
- 数据需要按照某个属性(key)进行分组(或者说分区);
- 相同 key 的数据需要传输给同一个 task,在一个 task 中进行处理;
- broadcast strategy
- random strategy
- 数据随机的从一个 task 中传输给下一个 operator 所有的 subtask;
- 保证数据能均匀的传输给所有的 subtask;
TaskManager的并行度 与 Task的数据传播策略的关系
1.4.5 Operator Chain- Flink与Kafka版本整合
添加如下依赖:
org.apache.flink flink-connector-kafka-0.11_2.11 ${flink.version}
public class KafkaSourceWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic = "testSlot";
Properties comsumerProperties = new Properties();
comsumerProperties.setProperty("bootstrap.servers","192.168.15.102:9092");
comsumerProperties.setProperty("group.id","testSlot_consumer");
FlinkKafkaConsumer011 myConsumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), comsumerProperties);
DataStreamSource data = env.addSource(myConsumer).setParallelism(3);
SingleOutputStreamOperator wordoneStream = data.flatMap(new SplitWord()).setParallelism(2) ;
SingleOutputStreamOperator result = wordOneStream.keyBy(0).sum(1).setParallelism(2);
result.map(tuple -> tuple.toString()).setParallelism(2).print().setParallelism(1);
env.execute("wordCount2");
}
}
- dataflow效果图如下
注:演示资源不足,修改资源以后,任务正常运行
- Operator Chain
Operator Chain的条件:
- 数据传输策略是 forward strategy
- 在同一个 TaskManager 中运行
并行度都设置为1,观察情况
并行度设置为2,观察情况
1.5 Flink分布式运行环境 1.5.1 flink分布式四层模型并行度设置为3,观察情况
Flink 代码开发就是要构建一个 dataflow,这个 dataflow 运行需要经历如下 4 个阶段:
- Stream Graph
- Job Graph
- Execution Graph
- Physical Execution Graph
source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
flink提供了大量的已经实现好的source方法,也可以自定义source:
- 通过实现sourceFunction接口来自定义无并行度的source
- 通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction来自定义有并行度的source
大多数情况下,我们使用自带的source即可。
获取source的方式(自带的)
1. 基于文件 - `readTextFile(path)` - 读取文本文件,文件遵循`TextInputFormat`读取规则,逐行读取并返回。2. 基于socket - `socketTextStream` - 从socker中读取数据,元素可以通过一个分隔符切开。3. 基于集合 - `fromCollection(Collection)` - 通过java 的`collection`集合创建一个数据流,集合中的所有元素必须是相同类型的。4. 扩展数据源 - `addSource` 可以实现读取第三方数据源的数据 - 系统内置提供了一批`connectors`,连接器会提供对应的source支持【kafka】
扩展的数据源
- Apache Kafka (source/sink) 后面重点分析
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
public class StreamingSourceFromCollection {
public static void main(String[] args) throws Exception {
// 1. 获取环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 模拟数据
ArrayList data = new ArrayList<>();
data.add("hadoop");
data.add("spark");
data.add("flink");
// 3. 获取数据源
DataStreamSource dataStream = env.fromCollection(data);
// 4. transfrom action操作
SingleOutputStreamOperator addPreStream =
dataStream.map(new MapFunction() {
@Override
public String map(String word) throws Exception {
return "kaikeba_" + word;
}
});
// 5. 对结果进行处理(打印到控制台)
addPreStream.print().setParallelism(1);
// 6. 启动程序
env.execute("StreamingSourceFromCollection");
}
}
2.1.3 自定义单并行度数据源
public class MyNoParalleSource implements SourceFunction{ private long number = 1l; private boolean isRunning = true; @Override public void run(SourceContext sct) throws Exception { while(isRunning){ sct.collect(number); number++; // 每秒生成一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator dataStream =
numberStream.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到了数据:" + value);
return value;
}
});
SingleOutputStreamOperator filterDataStream =
dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
运行结果:
2.1.4 自定义多并行度数据源public class MyParalleSource implements ParallelSourceFunction{ private long number = 1l; private boolean isRunning = true; @Override public void run(SourceContext sct) throws Exception { while(isRunning){ sct.collect(number); number++; // 每秒生成一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
public class StreamingDemoWithMyParalalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 修改数据源,设置并行度
DataStreamSource numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
SingleOutputStreamOperator dataStream =
numberStream.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到了数据:" + value);
return value;
}
});
SingleOutputStreamOperator filterDataStream =
dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyParalalleSource");
}
}
运行结果:
2.2 常见Transformation操作 2.2.1 map和filter
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator dataStream =
numberStream.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到了数据:" + value);
return value;
}
});
SingleOutputStreamOperator filterDataStream =
dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("MapDemo");
}
}
2.2.2 flatMap,keyBy和sum
经过测试,发现问题在于POJO类的定义上。对于要充当key的POJO类,必须满足以下条件:
- 字段名必须声明为public的;
- 必须有默认的无参构造器;
- 所有构造器必须声明为public的。
public class WordCount {
public String word;
public long count;
public WordCount() {
}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
public class WindowWordCountJava {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
int port = parameterTool.getInt("port");
String hostname = parameterTool.get("hostname");
String delimiter = "t";
// 1. 获取flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 获取数据源
DataStreamSource textStream = env.socketTextStream(hostname, port, delimiter);
// 3. 执行transformation操作
SingleOutputStreamOperator wordCountStream = textStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String line, Collector collector) throws Exception {
String[] fileds = line.split("t");
for (String word : fileds) {
collector.collect(new WordCount(word, 1l));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1)) // 每隔1秒计算最近2秒的数据
.sum("count");
wordCountStream.print().setParallelism(1);
// 4. 运行程序
env.execute("WindowWordCountJava");
}
}
2.2.3 union
// 合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的
public class UnionDemo {
public static void main(String[] args) throws Exception {
// 1. 获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 获取数据源
DataStreamSource text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
// 3. 把 text1 和 text2 组装到一起
DataStream text = text1.union(text2);
DataStream num = text.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到数据:" + value);
return value;
}
});
// 4. 每2秒钟处理一次数据
DataStream sum = num.timeWindowAll(Time.seconds(2)).sum(0);
// 5. 打印结果
sum.print().setParallelism(1);
env.execute(UnionDemo.class.getSimpleName());
}
}
2.2.4 connect, conMap和conFlatMap
// 和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
// 获取flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据源
DataStreamSource text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator text2_str = text2.map(new MapFunction() {
@Override
public String map(Long value) throws Exception {
return "str_" + value;
}
});
ConnectedStreams connectStream = text1.connect(text2_str);
SingleOutputStreamOperator
2.2.5 Split和Select
public class SplitDemo {
public static void main(String[] args) throws Exception {
// 获取flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据源
DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1);
// 对流进行切分,按照数据的奇偶性进行区分
SplitStream splitStream = text.split(new OutputSelector() {
@Override
public Iterable select(Long value) {
ArrayList outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even"); // 偶数
} else {
outPut.add("odd"); // 奇数
}
return outPut;
}
});
// 选择一个或者多个切分后的流
DataStream evenStream = splitStream.select("even");
DataStream oddStream = splitStream.select("odd");
DataStream moreStream = splitStream.select("odd","even");
// 打印结果
evenStream.print().setParallelism(1);
env.execute(SplitDemo.class.getSimpleName());
}
} }}
2.3 常见sink操作
2.3.1 print() / printToErr()
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。
2.3.2 writeAsText()
public class WriteTextDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator dataStream = numberStream.map(new MapFunction() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到了数据:" + value);
return value;
}
});
SingleOutputStreamOperator filterDataStream = dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Long value) throws Exception {
return value % 2 == 0;
}
});
filterDataStream.writeAsText("C:\test.txt").setParallelism(1);
env.execute(WriteTextDemo.class.getSimpleName());
}
}
2.3.3 Flink提供的sink
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
1、基于文件 readTextFile(path) 2、基于集合 fromCollection(Collection)3.2 transform 3.2.1. 算子概览
- Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
- FlatMap:输入一个元素,可以返回零个,一个或者多个元素
- MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
- Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
- Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
- Aggregate:sum、max、min等
- Distinct:返回一个数据集中去重之后的元素,data.distinct()
- Join:内连接
- OuterJoin:外链接
- Cross:获取两个数据集的笛卡尔积
- Union:返回两个数据集的总和,数据类型需要一致
- First-n:获取集合中的前N个元素
- Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序
public class MapPartitionDemo {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource text = env.fromCollection(data);
// map(): 一条数据一条数据的处理
// mapPartition(): 一个分区一个分区的处理
DataSet mapPartitionData = text.mapPartition(new MapPartitionFunction() {
@Override
public void mapPartition(Iterable values, Collector collector) throws Exception {
//获取数据库连接--注意,此时是一个分区的数据获取一次连接【优点,每个分区获取一次链接】
//values中保存了一个分区的数据
//处理数据
Iterator it = values.iterator();
while (it.hasNext()) {
String next = it.next();
String[] split = next.split("\W+");
for (String word : split) {
collector.collect(word);
}
}
}
// 关闭连接
});
mapPartitionData.print();
env.execute(MapPartitionDemo.class.getSimpleName());
}
}
3.2.4. distinct
// 对数据进行去重
public class DistinctDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList data = new ArrayList<>();
data.add("you jump");
data.add("i jump");
DataSource text = env.fromCollection(data);
FlatMapOperator flatMapData = text.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector collector) throws Exception {
String[] split = value.toLowerCase(Locale.ROOT).split("\W+");
for (String word : split) {
System.out.println("单词:" + word);
collector.collect(word);
}
}
});
// 对数据进行整体去重
flatMapData.distinct().print();
}
}
3.2.5. join
// 对数据进行join
public class joinDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// tuple2<用户id,用户姓名
ArrayList> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
// tuple2<用户id,用户所在城市>
ArrayList> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(3,"guagnzhou"));
DataSource> text1 = env.fromCollection(data1);
DataSource> text2 = env.fromCollection(data2);
text1.join(text2).where(0) //指定第一个数据集 需要进行比较的元素角标
.equalTo(0) //指定第二个数据集 需要进行比较的元素角标
.with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first,
Tuple2 second) throws Exception {
return new Tuple3<>(first.f0, first.f1,second.f1);
}
}).print();
System.out.println("==============");
}
}
3.2.6. OutJoin
public class OutJoinDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// tuple2<用户id,用户姓名
ArrayList> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
// tuple2<用户id,用户所在城市>
ArrayList> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(3,"guagnzhou"));
DataSource> text1 = env.fromCollection(data1);
DataSource> text2 = env.fromCollection(data2);
text1.leftOuterJoin(text2).where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first,
Tuple2 second) throws Exception {
if(second == null){
return new Tuple3<>(first.f0,first.f1,"null");
}
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("==================================");
text1.rightOuterJoin(text2).where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first,
Tuple2 second) throws Exception {
if(first == null){
return new Tuple3<>(second.f0,"null",second.f1);
}
return new Tuple3<>(second.f0,first.f1,second.f1);
}
}).print();
System.out.println("==================================");
text1.fullOuterJoin(text2).where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first,
Tuple2 second) throws Exception {
if(first == null){
return new Tuple3<>(second.f0,"null",second.f1);
}else if(second == null){
return new Tuple3<>(first.f0, first.f1,"null");
}
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
System.out.println("==================================");
}
}
3.2.7. Cross
// 笛卡尔积
public class CrossDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList data1 = new ArrayList<>();
data1.add("zs");
data1.add("ls");
data1.add("ww");
ArrayList data2 = new ArrayList<>();
data2.add(1);
data2.add(2);
data2.add(3);
DataSource text1 = env.fromCollection(data1);
DataSource text2 = env.fromCollection(data2);
CrossOperator.DefaultCross cross = text1.cross(text2);
cross.print();
}
}
3.2.8. First-n 和 SortPartition
// TopN
public class FirstDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList> data = new ArrayList<>();
data.add(new Tuple2<>(2,"zs"));
data.add(new Tuple2<>(4,"ls"));
data.add(new Tuple2<>(3,"ww"));
data.add(new Tuple2<>(1,"xw"));
data.add(new Tuple2<>(1,"aw"));
data.add(new Tuple2<>(1,"mw"));
DataSource> text = env.fromCollection(data);
// 获取前3条数据,按照数据插入的顺序
text.first(3).print();
System.out.println("==================================");
// 根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print();
System.out.println("==================================");
// 根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素
text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
System.out.println("==================================");
// 不分组,全局排序获取集合中的前3个元素,针对第一个元素升序,第二个元素倒序
text.sortPartition(0, Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();
}
}
3.2.9. partition
public class HashRangePartitionDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList> data = new ArrayList<>();
data.add(new Tuple2<>(1,"hello1"));
data.add(new Tuple2<>(2,"hello2"));
data.add(new Tuple2<>(2,"hello3"));
data.add(new Tuple2<>(3,"hello4"));
data.add(new Tuple2<>(3,"hello5"));
data.add(new Tuple2<>(3,"hello6"));
data.add(new Tuple2<>(4,"hello7"));
data.add(new Tuple2<>(4,"hello8"));
data.add(new Tuple2<>(4,"hello9"));
data.add(new Tuple2<>(4,"hello10"));
data.add(new Tuple2<>(5,"hello11"));
data.add(new Tuple2<>(5,"hello12"));
data.add(new Tuple2<>(5,"hello13"));
data.add(new Tuple2<>(5,"hello14"));
data.add(new Tuple2<>(5,"hello15"));
data.add(new Tuple2<>(6,"hello16"));
data.add(new Tuple2<>(6,"hello17"));
data.add(new Tuple2<>(6,"hello18"));
data.add(new Tuple2<>(6,"hello19"));
data.add(new Tuple2<>(6,"hello20"));
data.add(new Tuple2<>(6,"hello21"));
DataSource> text = env.fromCollection(data);
text.partitionByRange(0).mapPartition(new MapPartitionFunction, Tuple2>() {
@Override
public void mapPartition(Iterable> values, Collector> collector) throws Exception {
Iterator> it = values.iterator();
while(it.hasNext()){
Tuple2 next = it.next();
System.out.println("当前进程id " + Thread.currentThread().getId() +" ---> " + next);
}
}
}).print();
}
} }).print(); }}
输出结果
当前进程id 117 ---> (1,hello1) 当前进程id 118 ---> (2,hello2) 当前进程id 118 ---> (2,hello3) 当前进程id 120 ---> (3,hello4) 当前进程id 120 ---> (3,hello5) 当前进程id 120 ---> (3,hello6) 当前进程id 123 ---> (4,hello7) 当前进程id 123 ---> (4,hello8) 当前进程id 123 ---> (4,hello9) 当前进程id 123 ---> (4,hello10) 当前进程id 125 ---> (5,hello11) 当前进程id 125 ---> (5,hello12) 当前进程id 125 ---> (5,hello13) 当前进程id 125 ---> (5,hello14) 当前进程id 125 ---> (5,hello15) 当前进程id 130 ---> (6,hello16) 当前进程id 130 ---> (6,hello17) 当前进程id 130 ---> (6,hello18) 当前进程id 130 ---> (6,hello19) 当前进程id 130 ---> (6,hello20) 当前进程id 130 ---> (6,hello21)3.3 sink
- writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
- writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
- print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
- 广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks。
- 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
- 另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
- 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
- 如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
用法
1:初始化数据 DataSettoBroadcast = env.fromElements(1, 2, 3) 2:广播数据 withBroadcastSet(toBroadcast, "broadcastSetName"); 3:获取数据 Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
- 广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
- 广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
public class BroadCastDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1:准备需要广播的数据
ArrayList> broadcast = new ArrayList<>();
broadcast.add(new Tuple2<>("zs", 18));
broadcast.add(new Tuple2<>("ls", 20));
broadcast.add(new Tuple2<>("ww", 17));
DataSource> tupleData = env.fromCollection(broadcast);
//1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是 用户年龄
MapOperator, HashMap> toBroadcast =
tupleData.map(new MapFunction, HashMap>() {
@Override
public HashMap map(Tuple2 value) throws Exception {
HashMap res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
// 源数据
DataSource data = env.fromElements("zs", "ls", "ww");
// 注意:在这里需要使用到RichMapFunction获取广播变量
MapOperator result = data.map(new RichMapFunction() {
List> broadCastMap = new ArrayList<>();
HashMap allMap = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
@Override
public String map(String value) throws Exception {
Integer age = allMap.get(value);
return value + " : " + age;
}
}).withBroadcastSet(toBroadcast, "broadCastMapName"); //2:执行广播数据的操作
result.print();
}
}
输出结果:
zs : 18 ls : 20 ww : 173.5 Flink之Counter(计数器)
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数
据变化可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现IntCounter, LongCounter 和 DoubleCounter
用法
1:创建累加器
private IntCounter numLines = new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器
this.numLines.add(1);
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
// 计数器
public class CounterDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource data = env.fromElements("a", "b", "c", "d");
MapOperator result = data.map(new RichMapFunction() {
// 1:创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
}
@Override
public String map(String value) throws Exception {
// 如果并行度为1,使用普通的累加求和即可,
// 但是设置多个并行度,则普通的累加求和结果就不准了
this.numLines.add(1);
return value;
}
}).setParallelism(8);
// 如果要获取counter的值,只能是任务
result.writeAsText("C:\test.txt");
JobExecutionResult jobResult = env.execute("Counter");
// 3:获取累加器
int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num: " + num);
}
}



