1:事件驱动(Event-driven)
2:基于流处理
一切皆由流组成,离线数据是有界的流;实时数据是一个没有界限的流。(有界流、无界流)
3:分层API
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活
使用maven创建flink工程。添加依赖。
org.apache.flink
flink-java
1.10.1
org.apache.flink
flink-streaming-java_2.12
1.10.1
三:wordCount
离线处理:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; // 自定义类,实现FlatMapFunction接口 // 第一个参数是接受的数据类型,第二个参数是输出结果的类型 class ImplFlatMapFunc implements FlatMapFunction> { @Override public void flatMap(String value, Collector > out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } } public class WordCount { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取文件文件数据 String inputPath = "G:\bigData\flink\file\hello.txt"; // 获取数据集 DataSet inputDataSet = env.readTextFile(inputPath); // groupBy(0) 按照第一个位置的word分组 // sum(1) 按照第二个位置的单词数聚合 DataSet > result = inputDataSet.flatMap(new ImplFlatMapFunc()).groupBy(0).sum(1); result.print(); } }
输出:
(scala,1) (you,2) (flink,1) (world,1) (hello,4) (and,1) (thank,1) (fine,1) (spark,1)
流处理环境:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置线程数,默认是电脑的核数。
// env.setParallelism(1);
// 读取文件
String inputPath = "G:\bigData\flink\file\hello.txt";
DataStream inputDataStream = env.readTextFile(inputPath);
// 按照keyBy(0) 第一个字段进行分组
// sum(1) 第一个字段进行数据聚合
DataStream> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
.keyBy(0)
.sum(1);
resultStream.print();
// 开始执行任务
env.execute();
}
}
输出:
默认多线程情况下,流式计算是有状态的计算,会记录每一次的统计结果。8>表示不同的线程数。
8> (and,1) 5> (fine,1) 5> (you,1) 1> (scala,1) 5> (you,2) 3> (thank,1) 1> (spark,1) 7> (flink,1) 5> (world,1) 3> (hello,1) 3> (hello,2) 3> (hello,3) 3> (hello,4)
通过nc实现真正的流式处理
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class NCWordCount {
public static void main(String[] args) throws Exception {
// 设置流式运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置程序并行度
env.setParallelism(1);
// 接受程序运行时参数
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");
// 设置程序数据来源
DataStream inputDataStream = env.socketTextStream(host, port);
// 处理流式数据
DataStream> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
.keyBy(0)
.sum(1);
// 打印数据
resultStream.print();
// 执行程序
env.execute();
}
}
三:Flink架构
3.1:运行时组件
3.1.1:JobManager
作业管理器
每个程序都有一个对应的JobManager,控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
JobManager会先接收到要执行的应用程序,这个应用程序会包括:
作业图(JobGraph)
逻辑数据流图(logical dataflow graph)
打包了所有的类、库和其它资源的JAR包。
JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
资源管理器.
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
任务管理器
TaskManager的个数 = Job的最大并行度 / 每个TaskManager分配的任务槽数。
每个TaskManager分配的任务槽数可以通过--yarnslots参数来指定.
一个工作进程会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots,又称TasksSlot)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager可以向插槽分配任务(tasks)来执行了。
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5TeRMJC7-1650611267401)(flink.assets/image-20210518120117278.png)]
3.1.4:Dispatcher 分发器
可以跨作业运行,它为应用提交提供了``REST``接口。 当一个应用被提交执行时,分发器就会启动并将应用移交给一个``JobManager``。由于是``REST``接口,所以``Dispatcher``可以作为集群的一个``HTTP``接入点,这样就能够不受防火墙阻挡。``Dispatcher``也会启动一个``Web UI``,用来方便地展示和监控作业执行的信息。 ``Dispatcher``在架构中可能并不是必需的,这取决于应用提交运行的方式。3.2:任务提交流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JiEEtant-1650611267403)(flink.assets/image-20210518113604910.png)]
ps:上图中7.指TaskManager为JobManager提供slots,8.表示JobManager提交要在slots中执行的任务给TaskManager。
3.3:yarn运行流程[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tq0ROj66-1650611267404)(flink.assets/image-20210518113709980.png)]
- Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
- 之后客户端向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster
- ApplicationMaster启动后加载Flink的Jar包和配置构建环境,去启动JobManager,之后JobManager向Flink自身的RM进行申请资源,自身的RM向Yarn 的ResourceManager申请资源(因为是yarn模式,所有资源归yarn RM管理)启动TaskManager
- Yarn ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
- NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
-
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism),我们可以对单独的每个算子进行设置并行度,也可以直接用env设置全局的并行度,更可以在页面中去指定并行度。
-
最后,由于并行度是实际Task Manager处理task的能力,而一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度,则可以得出在设置slot时,在所有设置中的最大设置的并行度大小则就是所需要设置的slot的数量。(如果Slot分组,则需要为每组slot并行度最大值的和)
4.3.3 程序和数据流(DataFlow)
四:流处理 4.1:创建执行环境[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OcYnKj13-1650611267404)(flink.assets/image-20220103200902904.png)]
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
4.2:Source 读取数据。
4.2.1:从集合中读取数据import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Source_Collection {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据顺序输出
environment.setParallelism(1);
// 从集合中读取数据
DataStream streamSource = environment.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
// fromElements 读取数据
DataStream fromElements = environment.fromElements(
new SensorReading("se", 12L, 2.9),
new SensorReading("se_2", 12L, 2.9),
new SensorReading("se_7", 12L, 2.9),
new SensorReading("se_9", 12L, 2.9),
new SensorReading("se_11", 12L, 2.9));
// 设置流的名称
streamSource.print("data");
fromElements.print("int");
// 执行
environment.execute();
}
}
输出:
data> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
data> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
int> SensorReading{temperature=2.9, timestamp=12, id='se'}
data> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_2'}
data> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_9'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_11'}
4.2.3:从文件读取数据
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Source_File {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// 读取
DataStream textFile = environment.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
// 输出
textFile.print("TF");
// 执行
environment.execute();
}
}
4.2.4:从kafka读取数据
引入kafka依赖:
org.apache.flink flink-connector-kafka-0.11_2.12 1.10.1
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Source_Kafka {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置kafka连接设置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9002");
// 连接kafka
DataStream textFile = environment.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
// 输出
textFile.print("Kafka");
// 执行
environment.execute();
}
}
4.2.4:自定义数据源
会源源不断的生成新的数据。
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.HashMap;
import java.util.Random;
public class Source_Define {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataStream = environment.addSource(new MySensorSource());
dataStream.print();
environment.execute();
}
// 实现自定义的SourceFunction
public static class MySensorSource implements SourceFunction {
// 标示位,控制数据产生
private volatile boolean running = true;
@Override
public void run(SourceContext ctx) throws Exception {
//定义一个随机数发生器
Random random = new Random();
// 设置10个传感器的初始温度
HashMap sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; ++i) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for (String sensorId : sensorTempMap.keySet()) {
// 在当前温度基础上随机波动
double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
}
// 控制输出频率
Thread.sleep(2000L);
}
}
@Override
public void cancel() {
this.running = false;
}
}
}
4.3:Transform
转换算子。
4.3.1:基本转换算子 map, flatMap, filter
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Transform_map {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
// 返回每条数据的长度,使用lambda表达式
DataStream mapStream = dataStream.map(String::length);
// 切分每条数据
DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] fields = value.split(",");
for (String field : fields) {
out.collect(field);
}
}
});
// 筛选是否以某个单词开头
DataStream filterStream = dataStream.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
env.execute();
}
}
输出:
map> 24 flatMap> sensor_1 flatMap> 1547718199 flatMap> 35.8 filter> sensor_1,1547718199,35.8 map> 24 flatMap> sensor_1 flatMap> 1547718299 flatMap> 34.8 filter> sensor_1,1547718299,34.8 map> 24 flatMap> sensor_6 flatMap> 1547718201 flatMap> 15.4 map> 24 flatMap> sensor_6 flatMap> 1547718202 flatMap> 25.4 map> 23 flatMap> sensor_7 flatMap> 1547718202 flatMap> 6.7 map> 24 flatMap> sensor_7 flatMap> 1547718203 flatMap> 16.7 map> 25 flatMap> sensor_10 flatMap> 1547718205 flatMap> 38.1 filter> sensor_10,1547718205,38.1 map> 25 flatMap> sensor_10 flatMap> 1547718206 flatMap> 38.3 filter> sensor_10,1547718206,38.34.3.2:聚合算子
DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合).
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_rolling {
public static void main(String[] args) throws Exception {
// 创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取数据
DataStream dataStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 解析文件数据
DataStream sensorStream = dataStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 根据数据的id进行分组
KeyedStream keyedStream = sensorStream.keyBy(SensorReading::getId);
// 滚动聚合,max和maxBy区别在于,maxBy除了用于max比较的字段以外,其他字段也会更新成最新的,而max只有比较的字段更新,其他字段不变
DataStream resultStream = keyedStream.maxBy("temperature");
// 输出
keyedStream.print("keyedStream: ");
resultStream.print("resultStream: ");
// 执行
env.execute();
}
}
keyedStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
keyedStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
keyedStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
keyedStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
resultStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
resultStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
4.3.3:reduce
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
DataStream sensorStream = dataStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 分组
KeyedStream keyedStream = sensorStream.keyBy(SensorReading::getId);
// reduce,自定义规约函数,获取max温度的传感器信息以外,时间戳要求更新成最新的
DataStream resultStream = keyedStream.reduce(
(oldValue, newValue) -> new SensorReading(newValue.getId(), newValue.getTimestamp(), Math.max(oldValue.getTemperature(), newValue.getTemperature()))
);
resultStream.print("result");
env.execute();
}
}
4.3.4:多流转换算子
分流:
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Transform_splitStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 转换成SensorReading
DataStream dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
// 创建分流标记
OutputTag high = new OutputTag("high"){
private static final long serialVersionUID = 1L;
};
OutputTag low = new OutputTag("low"){
private static final long serialVersionUID = 1L;
};
// 分流
SingleOutputStreamOperator processStream = dataStream.process(new ProcessFunction() {
@Override
public void processElement(SensorReading value, ProcessFunction.Context ctx,
Collector out) {
if (value.getTemperature() > 30) {
ctx.output(high, value);
}else {
ctx.output(low, value);
}
}
});
// 获取侧写流
DataStream highStream = processStream.getSideOutput(high);
DataStream lowStream = processStream.getSideOutput(low);
// 输出
highStream.print("high");
lowStream.print("low");
// 执行
env.execute("旁路分流");
}
}
合流:
import com.dzh.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Transform_connect {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStream inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 转换成SensorReading数据流
DataStream dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 创建分流标记
OutputTag high = new OutputTag("high") {
private static final long serialVersionUID = 1L;
};
OutputTag low = new OutputTag("low") {
private static final long serialVersionUID = 1L;
};
// 分流
SingleOutputStreamOperator processStream = dataStream.process(new ProcessFunction() {
@Override
public void processElement(SensorReading value, ProcessFunction.Context ctx,
Collector out) {
if (value.getTemperature() > 30) {
ctx.output(high, value);
} else {
ctx.output(low, value);
}
}
});
// 获取侧写流
DataStream highStream = processStream.getSideOutput(high);
DataStream lowStream = processStream.getSideOutput(low);
highStream.print("high");
lowStream.print("low");
// 合并流
SingleOutputStreamOperator> warningStream = highStream.map(new MapFunction>() {
@Override
public Tuple2 map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams, SensorReading> streams = warningStream.connect(lowStream);
SingleOutputStreamOperator
输出:
high> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
high> SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
low> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
low> SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
low> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
low> SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
high> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
high> SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
(sensor_1,35.8,high temp warning)
(sensor_6,normal)
(sensor_1,34.8,high temp warning)
(sensor_6,normal)
(sensor_10,38.1,high temp warning)
(sensor_7,normal)
(sensor_10,38.3,high temp warning)
(sensor_7,normal)
- Connect 的数据类型可以不同,Connect 只能合并两个流;
- Union可以合并多条流,Union的数据结构必须是一样的;
``Flink``没有类似于``spark``中``foreach``方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
4.5:Window streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
4.5.1:Windown的类型时间窗口(Time Window)
滚动时间窗口
滑动时间窗口
会话窗口
计数窗口(Count Window)
滚动计数窗口
滑动计数窗口
TimeWindow:按照时间生成Window。
CountWindow:按照指定的数据条数生成一个Window,与时间无关。
滚动窗口:
依据固定的窗口长度对数据进行切分
时间对齐,窗口长度固定,没有重叠
滑动窗口:
可以按照固定的长度向后滑动固定的距离
滑动窗口由固定的窗口长度和滑动间隔组成
可以有重叠(是否重叠和滑动距离有关系)
滑动窗口是固定窗口的更广义的一种形式,滚动窗口可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)
会话窗口:
由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间无对齐。
窗口分配器——window()方法
我们可以用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他处理操作。
注意window()方法必须在keyBy之后才能使用。keyBy是一个分组函数。
Flink提供了更加简单的.timeWindow()和.countWindow()方法,用于定义时间窗口和计数窗口。
// windowAll 全部放入同一个分区 不建议使用
窗口函数
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)
增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction
其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流
Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行操作算子的本地系统时间,与机器相关;
Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
timeWindow(Time, Time) 调用滑动窗口
window() 自己传入窗口类型
window(EventTimeSessionWindows.withGap(Time.seconds(10))) 会话窗口
countWindow(long) 滚动
countWindow(long, long) 滑动
窗口函数
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)
增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction
其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流
Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行操作算子的本地系统时间,与机器相关;
Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。



