栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink TableAPI和SQL(一)快速上手

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink TableAPI和SQL(一)快速上手

文章目录
  • 快速上手
  • 一个简单示例

快速上手

如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API 和 SQL 的使用其实非常简单:只要得到一个“表”(Table),然后对它调用 Table API,或者直接写 SQL 就可以了。接下来我们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。

Flink1.13版本不算稳定,依然在不停的调整和更新,关注 原理和基本用法。

需要引入的依赖


  org.apache.flink
  flink-table-api-java-bridge_${scala.binary.version}
  ${flink.version}

这里的依赖是一个 Java 的“桥接器”(bridge),主要就是负责 Table API 和下层 DataStream API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:


  org.apache.flink
  flink-table-planner-blink_${scala.binary.version}
  ${flink.version}


  org.apache.flink
  flink-streaming-scala_${scala.binary.version}
  ${flink.version}

这里主要添加的依赖是一个“计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。

而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:


  org.apache.flink
  flink-table-common
  ${flink.version}

一个简单示例
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        //1.创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源
        SingleOutputStreamOperator dataStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2.创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream 转换成 Table
        Table eventTable = tableEnv.fromDataStream(dataStream);

        //TableAPI方式编写
        Table result1 = eventTable.select($("user"), $("url"), $("timestamp"))
                .where($("user")
                        .isEqual("Bob"));

        //SQL方式输出
        String sql = "select user,url,`timestamp` from " + eventTable;
        Table result2 = tableEnv.sqlQuery(sql);

        //输出
        tableEnv.toDataStream(result1).print("TableAPI");
        tableEnv.toDataStream(result2).print("SQL");

        env.execute();
    }
}

这里我们需要创建一个“表环境”(TableEnvironment),然后将数据流(DataStream)转换成一个表(Table);之后就可以执行 SQL 在这个表中查询数据了。查询得到的结果依然是一个表,把它重新转换成流就可以打印输出了。

这里的$符号是 Table API 中定义的“表达式”类 Expressions 中的一个方法,传入一个字段名称,就可以指代数据中对应字段。将得到的表转换成流打印输出,会发现结果与直接执行SQL 完全一样。

注意:写sql语句中,如果遇到字段与sql中的关键字冲突时,一定要在字段上加入`` 号,例如 timestamp

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863118.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号