在flink 1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库表或者读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。
操作冗余且繁琐,体验极差。
实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题!
1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。
目前支持的外部元数据,有如下2个:
- 使用HiveCatalog,读取hive metastore中的元数据定义
- 使用PostgresCatalog,读取PostgreSQL中的元数据定义
Catalogs提供元数据的管理,并提供一个统一的API来管理元数据。
元数据包括:数据库、表、分区、视图、函数和访问存储在数据库或其他外部系统中的数据所需的信息。
数据处理最关键的一个方面是管理元数据。
- 临时元数据:如临时表、在table environment中注册的udf。
- 永久元数据,类似于hive的metastore。
Catalog允许用户在其数据系统中引用现有的元数据,并自动将它们映射到Flink的相应元数据。
例如,Flink可以自动将JDBC表映射到Flink表,用户不需要在Flink中手动重写ddl。Catalog极大地简化了使用用户现有系统开始使用Flink所需的步骤,并极大地增强了用户体验。
Catalog根据实现方式,分为以下四种:
- GenericInMemoryCatalog 通用内存Catalog: 所有对象将仅在会话【session】的生命周期内可用
- JdbcCatalog:
- JdbcCatalog允许用户通过JDBC协议将Flink连接到关系数据库
- PostgresCatalog是目前JDBC Catalog的唯一实现,且仅支持有限的Catalog方法,包括:
- databaseExists(String databaseName)
- listDatabases()
- getDatabase(String databaseName)
- listTables(String databaseName)
- getTable(ObjectPath tablePath)
- tableExists(ObjectPath tablePath)
- HiveCatalog: HiveCatalog有如下2个用途:
- Flink元数据的持久存储
- Hive元数据读写接口
- 注意: Hive metastore以小写形式存储所有元对象名称。这与区分大小写的GenericInMemoryCatalog不同
- User-Defined Catalog: Catalog支持插件,用户可以通过实现Catalog接口开发自定义的Catalog
- 通过实现CatalogFactory接口来实现相应的Catalog工厂
- 通过SPI加载CatalogFactory的实现类【在JAR中定义meta_INF/services/org.apache.flink.table.factories.Factory】
execution:
planner: blink
type: batch
current-catalog: pg28
current-database: das
catalogs:
- name: myhive
type: hive
hive-conf-dir: /etc/hive/conf
default-database: default
- name: pg28
type: jdbc
username: videoweb
password: abcabc@123
base-url: jdbc:postgresql://172.25.2.8:5432/
default-database: das
启动sql client
./bin/sql-client.sh embedded -j ../jars/flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar -j ../jars/flink-connector-jdbc_2.11-1.13.2.jar -j ../jars/postgresql-42.2.5.jar shell
catalog操作
show catalogs; use catalog pg28; use das; show tables;flink针对hive的集成
Apache Hive已经将自己确立为数据仓库生态系统的焦点。它不仅是一个用于大数据分析和ETL的SQL引擎,也是一个数据管理平台,在这里可以发现、定义和发展数据。
在Hadoop生态系统中,Hive metastore已经发展成为事实上的元数据中心。许多公司在生产中都有一个Hive metastore服务实例来管理所有的元数据,无论是Hive元数据还是非Hive元数据,作为真相的来源。
Flink提供了与Hive的双重集成:
- 把Hive metastore作为一个持久化catalog,使用HiveCatalog来存储Flink 跨会话 的特定元数据,例如,用户可以使用HiveCatalog将Kafka或ElasticSearch表存储在Hive metastore中,并在以后的SQL查询中重用它们。
- 将 Flink 作为一个用于读写Hive表的替代引擎
HiveCatalog的设计是“开箱即用”的,与现有的Hive安装兼容。
注意: 强烈建议用户与Hive集成时,使用blink planner。
目前hive metastore版本与flink版本的对应关系如下:
HiveCatalog可用于处理两种类型的表:
- hive兼容表: hive兼容表是指以hive兼容的方式存储的表,包括存储层中的元数据和数据。因此,通过Flink创建的与Hive兼容的表可以从Hive侧查询。
- 通用表:
- 是Flink特有的。
- 在使用HiveCatalog创建通用表时,我们只是使用HMS【hive metastore】持久化元数据。
- 虽然这些表对Hive是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。
- Flink使用属性’ is_generic '来判断一个表是hive兼容的还是通用的
启动sql client
./bin/sql-client.sh embedded -j ../jars/flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar -j ../jars/flink-connector-jdbc_2.11-1.13.2.jar -j ../jars/postgresql-42.2.5.jar -j ../jars/flink-sql-connector-kafka_2.11-1.13.2.jar shell
CREATE TABLE mykafka (name String, age Int) WITH ( 'connector' = 'kafka', 'topic' = 'flink_test', 'properties.bootstrap.servers' = '172.25.6.7:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); describle mykafka; select * from mykafka;
在hive client中打印表结构: describe formatted mykafka;
flkink catalogs
hive catalog
flink针对hive的集成



