- Flink整合Hive
- 1、将整合需要的jar包上传到flink的lib目录
- 2、启动hive元数据服务
- 3、如果在sql-client中使用hive的catalog
- 4、在sql-client中使用hive的catalog
- 5、idea里写flinkSQL打包上传到集群运行
#需要三个jar包 flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar hive-exec-1.2.1.jar hive-metastore-1.2.1.jar
上传jar 之后需要重新启动yarn-session.sh(就重启flink,用的是yarn-session模式就重启这个模式)
yarn application -kill 进程id yarn-session.sh -jm 1024m -tm 10962、启动hive元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &3、如果在sql-client中使用hive的catalog
#修改sql-client-defaults.yaml
cd /usr/local/soft/flink-1.11.2/conf
vim sql-client-defaults.yaml
#添加如下内容:
catalogs:
- name: myhive
type: hive
hive-conf-dir: /usr/local/soft/hive-1.2.1/conf
default-database: default
4、在sql-client中使用hive的catalog
#启动 SQL 客户端命令行界面,显式使用 embedded 模式: sql-client.sh embedded USE CATALOG myhive; show tables;
在flink中创建的表在hive中就可以查看,不能查询数据
hive中的表在flink中可以查询
导入hive所需依赖包:
org.apache.flink flink-connector-hive_2.11 1.11.2 org.apache.hive hive-exec 1.2.1
package com.liu.sql
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog
object Demo7FlinkonHive {
def main(args: Array[String]): Unit = {
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() //使用blink的计划器
.inStreamingMode() //使用流模型
.build()
//窗口table 环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
val configuration = new Configuration()
//如果主键为null,自动删除
configuration.setString("table.exec.sink.not-null-enforcer", "drop")
configuration.setString("table.dynamic-table-options.enabled", "true")
bsTableEnv.getConfig.addConfiguration(configuration)
val name = "myhive"
val defaultDatabase = "flink" //先去创建flink数据库
val hiveConfDir = "/usr/local/soft/hive-1.2.1/conf"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
//注册catalog
bsTableEnv.registerCatalog("myhive", hive)
// 切换catalog
bsTableEnv.useCatalog("myhive")
bsTableEnv.executeSql(
"""
|insert into mysql_clazz_num
|select clazz,count(1) as num from
|student
|group by clazz
|""".stripMargin)
//这是hint语法:
//要放在每个表的后面
// select * from student ;
}
}



