栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

11.3.3、sql客户端

11.3.3、sql客户端

1、hbase的数据整合

读取canal-json的数据到hbase中
(1)导包


    org.apache.flink
    flink-connector-hbase_2.11
    1.11.2

(2)代码实现

object Demo10toHbase {
  def main(args: Array[String]): Unit = {

    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner() //使用blink的计划器
      .inStreamingMode() //使用流模型
      .build()

    //窗口table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
//    val configuration = new Configuration()
//    //如果主键为null,自动删除
//    configuration.setString("table.exec.sink.not-null-enforcer", "drop")
//    configuration.setString("table.dynamic-table-options.enabled", "true")
//    bsTableEnv.getConfig.addConfiguration(configuration)

    
    bsTableEnv.executeSql(
      """
        |
        |CREATE TABLE canal_student (
        | id STRING,
        | name STRING,
        | age BIGINT,
        | gender STRING,
        | clazz STRING
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'student.student',
        | 'properties.bootstrap.servers' = 'master:9092',
        | 'properties.group.id' = 'sdfg',
        | 'format' = 'canal-json',
        | 'scan.startup.mode' = 'earliest-offset',
        | 'canal-json.ignore-parse-errors' = 'true'
        |)
        |
      """.stripMargin)
    
    bsTableEnv.executeSql(
      """
        |
        |CREATE TABLE hbase_sink (
        | id STRING,
        | info ROW,
        | PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        | 'connector' = 'hbase-1.4',
        | 'table-name' = 'student',
        | 'zookeeper.quorum' = 'master:2181'
        |)
        |
      """.stripMargin)

    
    bsTableEnv.executeSql(
      """
        |insert into hbase_sink
        |select id,ROW(name,age,gender,clazz) as info from canal_student
        |
      """.stripMargin)
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761948.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号