1、DataStream 和 Table/SQL 混合应用的批执行模式
在 Flink 1.14 中,有界的批执行模式的 SQL/Table 应用可将其中间数据表转换成数据流,经过由 DataStream API 定义的算子处理,再转换回数据表。其内部原理是,Flink 构建了一个由优化的声明式 SQL执行和 DataStream 批执行混合而成的数据流 DAG
//创建流执行环境 和 表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//创建一个数据流
DataStream dataStream = env.fromElements("Alice", "Bob", "John");
//将仅插入流解释为表
Table inputTable = tableEnv.fromDataStream(dataStream);
//将表对象注册为视图并查询它
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECt UPPER(f0) FROM InputTable");
//再次将仅插入表解释为数据流(DataStream)
DataStream resultStream = tableEnv.toDataStream(resultTable);
//接收打印接收器并在DataStream API中执行
resultStream.print();
env.execute();
2、混合Source
全新的Source能够依次地从多个数据源读取数据,在不同数据源之间无缝切换,产出一条由来自多条数据源的数据合并而成的数据流
例如:
引导用例可能需要从 S3 读取几天的有界输入,
然后才能继续读取来自 Kafka 的最新无界输入。
当有界文件输入完成而不中断应用程序时,HybridSource 从 FileSource 切换到 KafkaSource。
org.apache.flink
flink-connector-base
1.14.0
2.1 固定起始位置
// 切换时间戳
long switchTimestamp = ...;
//读取文件数据,有界流
FileSource fileSource = FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
//读取kafka数据,从指定时间开始消费
KafkaSource kafkaSource =
KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
.build();
//混合源 构建 文件源 添加kafka源
HybridSource hybridSource =
HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
2.2 动态切换位置
//自定义文件源
FileSource fileSource = CustomFileSource.readTillOneDayFromLatest();
HybridSource hybridSource =
HybridSource.builder(fileSource) //给这个文件源添加枚举类
.addSource(
switchContext -> { //上下文
//自定义文件拆分枚举器
CustomFileSplitEnumerator previousEnumerator = //获取以前的枚举器
switchContext.getPreviousEnumerator(); //从上下文总获取上一个枚举器
//如何获取时间戳取决于特定的枚举器
long switchTimestamp = previousEnumerator.getEndTimestamp(); //从以前的枚举器获取结束时间戳当做 不同数据源的切换时间戳
//根据切换时间戳读取kafka数据源数据
KafkaSource kafkaSource =
KafkaSource.builder()
//设置开始偏移量 偏移量初始化程序的时间戳为切换时间戳 +1
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
.build();
return kafkaSource;
},
Boundedness.CONTINUOUS_UNBOUNDED)
.build();