1. calalog类型
1.1 GenericInMemoryCatalog1.2 JdbcCatalog1.3 HiveCatalog 2. catalog SQL语句使用
2.1 数据库2.2 表
1. calalog类型 1.1 GenericInMemoryCatalog默认的类型。对meta-object名称是大小写敏感的。默认的catalog为default_catalog,该catalog下有一个默认的数据库default_database
1.2 JdbcCatalog目前只支持Postgres数据库
1.3 HiveCatalog将所有的meta-object名称保存为小写
有两个作用:
- 保存Flink的metadata。此metadata并不能被Hive使用读写已经存在的Hive metadata,即读写Hive中的数据。该方法创建的表,需要设置属性'connector'='hive'
HiveCatalog的安装
- 添加依赖到Flink所有服务器的lib目录下,然后重启Flink
[root@flink1 ~]# [root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar [root@flink1 ~]# [root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink2:/root/flink-1.14.3/lib/ flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar 100% 46MB 75.9MB/s 00:00 [root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink3:/root/flink-1.14.3/lib/ flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar 100% 46MB 75.2MB/s 00:00 [root@flink1 ~]#
- 先在Flink集群所有服务器创建Hive配置目录hive_conf,然后将Hive集群的conf配置目录下的所有文件,复制到Flink集群的hive_conf
[root@flink1 ~]# [root@flink1 ~]# mkdir /root/flink-1.14.3/hive_conf [root@flink1 ~]# [root@hive1 ~]# [root@hive1 ~]# scp /root/apache-hive-3.1.2-bin/conf/* root@192.168.23.101:/root/flink-1.14.3/hive_conf
添加Hive所有服务器的IP映射,到Flink集群所有服务器的/etc/hosts
- 在Hive数据库创建Flink metadata数据库
0: jdbc:hive2://hive1:10000> 0: jdbc:hive2://hive1:10000> create database flink; INFO : Compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink INFO : Concurrency mode is disabled, not creating a lock manager INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:null, properties:null) INFO : Completed compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 1.006 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink INFO : Starting task [Stage-0:DDL] in serial mode INFO : Completed executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 3.957 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager No rows affected (5.604 seconds) 0: jdbc:hive2://hive1:10000>
- 在Flink中创建临时的catalog,并使用catalog。退出SQL-Client此catalog就不存在了
Flink SQL> create catalog my_hive with( > 'type' = 'hive', > 'default-database' = 'flink', > 'hive-conf-dir' = '/root/flink-1.14.3/hive_conf' > ); [INFO] Execute statement succeed. Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | | my_hive | +-----------------+ 2 rows in set Flink SQL> Flink SQL> use catalog my_hive; [INFO] Execute statement succeed. Flink SQL>
也可以将hive_conf目录上传到Flink集群所在的HDFS的根目录下,然后就可以配置'hive-conf-dir' = 'hdfs://nnha/hive_conf'
在Flink中创建永久的catalog,在Flink集群所有服务器新增如下文件,文件内容如下
[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml
execution:
type: streaming
current-catalog: default_catalog
current-database: default_database
catalogs:
- name: my_hive
type: hive
default-database: flink
hive-conf-dir: /root/flink-1.14.3/hive_conf
[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml
测试的时候,运行show catalogs并没有my_hive catalog,查看flink日志也没有报错。只能用上面介绍的创建临时catalog的方法
- 在Flink的Hive catalog中创建表,Hive的flink数据库也会有表信息
Flink SQL> create table blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.
Flink SQL>
0: jdbc:hive2://hive1:10000> 0: jdbc:hive2://hive1:10000> use flink; 0: jdbc:hive2://hive1:10000> 0: jdbc:hive2://hive1:10000> show tables; +------------------+ | tab_name | +------------------+ | blackhole_table | +------------------+ 0: jdbc:hive2://hive1:10000>2. catalog SQL语句使用 2.1 数据库
Flink SQL> create database my_hive.flink; [INFO] Execute statement succeed. Flink SQL> use catalog my_hive; [INFO] Execute statement succeed. Flink SQL> show databases; +--------------------+ | database name | +--------------------+ | default | | flink | | test_db | +--------------------+ 3 rows in set Flink SQL> drop database my_hive.flink; [INFO] Execute statement succeed. Flink SQL>
通过Scala API切换catalog和database
tEnv.useCatalog("my_hive")
tEnv.useDatabase("flink")
2.2 表
Flink SQL> create table my_hive.flink.blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.
Flink SQL>
Flink SQL> alter table my_hive.flink.blackhole_table rename to blackhole_table_new;
[INFO] Execute statement succeed.
Flink SQL>
Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.
Flink SQL> use flink;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+---------------------+
| table name |
+---------------------+
| blackhole_table_new |
+---------------------+
1 row in set
Flink SQL> drop table my_hive.flink.blackhole_table_new;
[INFO] Execute statement succeed.
Flink SQL>



