栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink1.14的混合源

flink1.14的混合源

1、DataStream 和 Table/SQL 混合应用的批执行模式
在 Flink 1.14 中,有界的批执行模式的 SQL/Table 应用可将其中间数据表转换成数据流,经过由 DataStream API 定义的算子处理,再转换回数据表。其内部原理是,Flink 构建了一个由优化的声明式 SQL执行和 DataStream 批执行混合而成的数据流 DAG
//创建流执行环境 和 表执行环境 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//创建一个数据流
DataStream dataStream = env.fromElements("Alice", "Bob", "John");

//将仅插入流解释为表
Table inputTable = tableEnv.fromDataStream(dataStream);

//将表对象注册为视图并查询它
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECt UPPER(f0) FROM InputTable");

//再次将仅插入表解释为数据流(DataStream)
DataStream resultStream = tableEnv.toDataStream(resultTable);

//接收打印接收器并在DataStream API中执行
resultStream.print();
env.execute();
2、混合Source
	全新的Source能够依次地从多个数据源读取数据,在不同数据源之间无缝切换,产出一条由来自多条数据源的数据合并而成的数据流
	例如:
	引导用例可能需要从 S3 读取几天的有界输入,
	然后才能继续读取来自 Kafka 的最新无界输入。
	当有界文件输入完成而不中断应用程序时,HybridSource 从 FileSource 切换到 KafkaSource。

    org.apache.flink
    flink-connector-base
    1.14.0

2.1 固定起始位置
// 切换时间戳
long switchTimestamp = ...; 
//读取文件数据,有界流
FileSource fileSource = FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
//读取kafka数据,从指定时间开始消费
KafkaSource kafkaSource =
          KafkaSource.builder()
                  .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                  .build();
//混合源 构建 文件源 添加kafka源 
HybridSource hybridSource =
          HybridSource.builder(fileSource)
                  .addSource(kafkaSource)
                  .build();
2.2 动态切换位置

//自定义文件源
FileSource fileSource = CustomFileSource.readTillOneDayFromLatest();
HybridSource hybridSource =
    HybridSource.builder(fileSource) //给这个文件源添加枚举类
        .addSource(
            switchContext -> { //上下文
            //自定义文件拆分枚举器
    		CustomFileSplitEnumerator previousEnumerator =  //获取以前的枚举器
                  switchContext.getPreviousEnumerator(); //从上下文总获取上一个枚举器
              //如何获取时间戳取决于特定的枚举器
              long switchTimestamp = previousEnumerator.getEndTimestamp(); //从以前的枚举器获取结束时间戳当做 不同数据源的切换时间戳
              //根据切换时间戳读取kafka数据源数据
              KafkaSource kafkaSource =
                  KafkaSource.builder()
                  	  //设置开始偏移量  偏移量初始化程序的时间戳为切换时间戳 +1 
                      .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                      .build();
              return kafkaSource;
            },
            Boundedness.CONTINUOUS_UNBOUNDED)
        .build();
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487109.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号