栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink Catalogs 如何玩转元数据中心?

Flink Catalogs 如何玩转元数据中心?

产生背景

在flink 1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库表或者读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。
操作冗余且繁琐,体验极差。

实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题!

1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。

目前支持的外部元数据,有如下2个:

  • 使用HiveCatalog,读取hive metastore中的元数据定义
  • 使用PostgresCatalog,读取PostgreSQL中的元数据定义
什么是Catalogs?

Catalogs提供元数据的管理,并提供一个统一的API来管理元数据。
元数据包括:数据库、表、分区、视图、函数和访问存储在数据库或其他外部系统中的数据所需的信息。

数据处理最关键的一个方面是管理元数据。

  • 临时元数据:如临时表、在table environment中注册的udf。
  • 永久元数据,类似于hive的metastore。

Catalog允许用户在其数据系统中引用现有的元数据,并自动将它们映射到Flink的相应元数据。
例如,Flink可以自动将JDBC表映射到Flink表,用户不需要在Flink中手动重写ddl。Catalog极大地简化了使用用户现有系统开始使用Flink所需的步骤,并极大地增强了用户体验。

Catalog Types

Catalog根据实现方式,分为以下四种:

  • GenericInMemoryCatalog 通用内存Catalog: 所有对象将仅在会话【session】的生命周期内可用
  • JdbcCatalog:
    • JdbcCatalog允许用户通过JDBC协议将Flink连接到关系数据库
    • PostgresCatalog是目前JDBC Catalog的唯一实现,且仅支持有限的Catalog方法,包括:
      • databaseExists(String databaseName)
      • listDatabases()
      • getDatabase(String databaseName)
      • listTables(String databaseName)
      • getTable(ObjectPath tablePath)
      • tableExists(ObjectPath tablePath)
  • HiveCatalog: HiveCatalog有如下2个用途:
    • Flink元数据的持久存储
    • Hive元数据读写接口
    • 注意: Hive metastore以小写形式存储所有元对象名称。这与区分大小写的GenericInMemoryCatalog不同
  • User-Defined Catalog: Catalog支持插件,用户可以通过实现Catalog接口开发自定义的Catalog
    • 通过实现CatalogFactory接口来实现相应的Catalog工厂
    • 通过SPI加载CatalogFactory的实现类【在JAR中定义meta_INF/services/org.apache.flink.table.factories.Factory】
如何使用catalog 配置conf/sql-client-defaults.yaml
execution:
    planner: blink
    type: batch
    current-catalog: pg28
    current-database: das
catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /etc/hive/conf
     default-database: default
   - name: pg28
     type: jdbc
     username: videoweb
     password: abcabc@123
     base-url: jdbc:postgresql://172.25.2.8:5432/
     default-database: das
启动sql client
./bin/sql-client.sh embedded 
-j ../jars/flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar 
-j ../jars/flink-connector-jdbc_2.11-1.13.2.jar 
-j ../jars/postgresql-42.2.5.jar shell

catalog操作

show catalogs;
use catalog pg28;
use das;
show tables;
flink针对hive的集成

Apache Hive已经将自己确立为数据仓库生态系统的焦点。它不仅是一个用于大数据分析和ETL的SQL引擎,也是一个数据管理平台,在这里可以发现、定义和发展数据。

在Hadoop生态系统中,Hive metastore已经发展成为事实上的元数据中心。许多公司在生产中都有一个Hive metastore服务实例来管理所有的元数据,无论是Hive元数据还是非Hive元数据,作为真相的来源。

Flink提供了与Hive的双重集成:

  • 把Hive metastore作为一个持久化catalog,使用HiveCatalog来存储Flink 跨会话 的特定元数据,例如,用户可以使用HiveCatalog将Kafka或ElasticSearch表存储在Hive metastore中,并在以后的SQL查询中重用它们。
  • 将 Flink 作为一个用于读写Hive表的替代引擎

HiveCatalog的设计是“开箱即用”的,与现有的Hive安装兼容。
注意: 强烈建议用户与Hive集成时,使用blink planner。

目前hive metastore版本与flink版本的对应关系如下:

hive catalog

HiveCatalog可用于处理两种类型的表:

  • hive兼容表: hive兼容表是指以hive兼容的方式存储的表,包括存储层中的元数据和数据。因此,通过Flink创建的与Hive兼容的表可以从Hive侧查询。
  • 通用表:
    • 是Flink特有的。
    • 在使用HiveCatalog创建通用表时,我们只是使用HMS【hive metastore】持久化元数据。
    • 虽然这些表对Hive是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。
    • Flink使用属性’ is_generic '来判断一个表是hive兼容的还是通用的
使用示例

启动sql client

 ./bin/sql-client.sh embedded 
 -j ../jars/flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar 
 -j ../jars/flink-connector-jdbc_2.11-1.13.2.jar 
 -j ../jars/postgresql-42.2.5.jar 
 -j ../jars/flink-sql-connector-kafka_2.11-1.13.2.jar shell

CREATE TABLE mykafka (name String, age Int) WITH (
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '172.25.6.7:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
describle mykafka;

select * from mykafka;


在hive client中打印表结构: describe formatted mykafka;

参考

flkink catalogs
hive catalog
flink针对hive的集成

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307320.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号