栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink整合Hive

Flink整合Hive

文章目录
    • Flink整合Hive
      • 1、将整合需要的jar包上传到flink的lib目录
      • 2、启动hive元数据服务
      • 3、如果在sql-client中使用hive的catalog
      • 4、在sql-client中使用hive的catalog
      • 5、idea里写flinkSQL打包上传到集群运行

Flink整合Hive 1、将整合需要的jar包上传到flink的lib目录
#需要三个jar包
flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar
hive-exec-1.2.1.jar
hive-metastore-1.2.1.jar

上传jar 之后需要重新启动yarn-session.sh(就重启flink,用的是yarn-session模式就重启这个模式)

yarn application -kill 进程id
yarn-session.sh -jm 1024m -tm 1096
2、启动hive元数据服务
 nohup hive --service metastore >> metastore.log 2>&1 &
3、如果在sql-client中使用hive的catalog
#修改sql-client-defaults.yaml
cd /usr/local/soft/flink-1.11.2/conf
vim sql-client-defaults.yaml

#添加如下内容:
catalogs: 
  - name: myhive
    type: hive
    hive-conf-dir: /usr/local/soft/hive-1.2.1/conf
    default-database: default

4、在sql-client中使用hive的catalog
#启动 SQL 客户端命令行界面,显式使用 embedded 模式:
sql-client.sh embedded

USE CATALOG myhive;
show tables;

在flink中创建的表在hive中就可以查看,不能查询数据
hive中的表在flink中可以查询

5、idea里写flinkSQL打包上传到集群运行

导入hive所需依赖包:


            org.apache.flink
            flink-connector-hive_2.11
            1.11.2
        

            org.apache.hive
            hive-exec
            1.2.1

package com.liu.sql

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog


object Demo7FlinkonHive {
  def main(args: Array[String]): Unit = {
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val bsSettings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner() //使用blink的计划器
      .inStreamingMode() //使用流模型
      .build()

    //窗口table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    val configuration = new Configuration()
    //如果主键为null,自动删除
    configuration.setString("table.exec.sink.not-null-enforcer", "drop")
    configuration.setString("table.dynamic-table-options.enabled", "true")
    bsTableEnv.getConfig.addConfiguration(configuration)

    

    val name = "myhive"
    val defaultDatabase = "flink" //先去创建flink数据库
    val hiveConfDir = "/usr/local/soft/hive-1.2.1/conf"

    val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
    //注册catalog
    bsTableEnv.registerCatalog("myhive", hive)

    // 切换catalog
    bsTableEnv.useCatalog("myhive")

    
    bsTableEnv.executeSql(
      """
        |insert into mysql_clazz_num
        |select clazz,count(1) as num from
        |student 
        |group by clazz
        |""".stripMargin)
    
    //这是hint语法:
    //要放在每个表的后面
     


    
     // select * from student ;
     
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/613234.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号