栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-sql kafka 实时写入Hive

Flink-sql kafka 实时写入Hive

Flink-sql kafka 实时写入Hive
  • 参考文档
  • 环境
  • pom
  • flink lib 文件
  • Demo
  • Hive

参考文档

时间定义
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/concepts/time_attributes/

环境
flink 1.13.1
java 1.8
scala 2.11
hive 2.1.1-cdh6.1.1
hadoop 3.0.0-cdh6.1.1

pom
  
    UTF-8
    1.8
    1.8
    1.13.1
    2.11

    2.1.1-cdh6.1.1

    3.0.0-cdh6.1.1
    2.8.2
  
  
  
  
  
  
  
  
      
          
              cloudera
              https://repository.cloudera.com/artifactory/cloudera-repos/
          
          
          
          
          
      































































  
    
        commons-cli
        commons-cli
        1.4
    
    
    






    
      org.apache.flink
      flink-streaming-scala_${scala.version}
      ${flink.version}

    
    
      org.apache.flink

      flink-table-api-scala-bridge_2.11
      ${flink.version}

    
    
      org.apache.flink
      flink-table-api-java-bridge_2.11

      ${flink.version}

    








    
      org.apache.flink
      flink-clients_${scala.version}
      ${flink.version}

    

    




















    
    







    
      org.apache.flink
      flink-table-planner-blink_2.11
      ${flink.version}
      provided
    
    
    
      org.apache.flink
      flink-table-runtime-blink_${scala.version}
      ${flink.version}
      provided
    


    





    


    
      org.apache.flink
      flink-table-common
      ${flink.version}

    
    


    
      org.apache.flink
      flink-sql-connector-kafka_${scala.version}
      ${flink.version}
    

    
      org.apache.flink
      flink-json
      ${flink.version}
    
    
      org.apache.flink
      flink-connector-hive_2.11
      ${flink.version}
      
    

    
      org.apache.hive
      hive-exec
      ${hive.version}

      
        
          calcite-avatica
          org.apache.calcite
        
        
          calcite-core
          org.apache.calcite
        
        
          calcite-linq4j
          org.apache.calcite
        
      
      
    

    
      org.apache.hadoop
      hadoop-client
      ${hadoop.version}
      provided
    
    
    
    
    
    
    


    
      org.apache.flink
      flink-parquet_2.11
      ${flink.version}
    

    
      org.apache.flink
      flink-orc_2.11
      ${flink.version}
    

    
      org.apache.flink
      flink-connector-kafka_${scala.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-connector-base
      ${flink.version}
    


    













    
      org.apache.logging.log4j
      log4j-slf4j-impl
      ${log4j.version}
      runtime
    
    
      org.apache.logging.log4j
      log4j-api
      ${log4j.version}
      runtime
    
    
      org.apache.logging.log4j
      log4j-core
      ${log4j.version}
      runtime
    






    
      mysql
      mysql-connector-java
      5.1.46
    
    
      org.apache.flink
      flink-connector-jdbc_2.11
      ${flink.version}
    
    
    
    
    
    
  

flink lib 文件

antlr-runtime-3.5.2.jar
com.ibm.icu-4.4.2.jar
commons-cli-1.4.jar
flink-connector-hive_2.11-1.13.1.jar
flink-connector-kafka_2.11-1.13.1.jar
flink-core-1.13.0.jar
flink-csv-1.13.1.jar
flink-dist_2.11-1.13.1.jar
flink-json-1.13.1.jar
flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-hive-2.2.0_2.11-1.13.3.jar
flink-table-api-java-1.13.1.jar
flink-table-api-java-bridge_2.11-1.13.1.jar.0
flink-table-api-scala_2.11-1.13.1.jar.0
flink-table-planner_2.11-1.13.1.jar
flink-table-planner-blink_2.11-1.13.1.jar
flink-table-runtime-blink_2.11-1.13.1.jar
jindo-flink-sink-3.6.1.jar
jindofs-sdk-3.6.1.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
Demo
package com.gwm.core.kafka

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import com.gwm.utils.log.Logging
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog


object TestFlinkKafka extends Logging{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //checkpoint
    env.enableCheckpointing(1000 * 60,CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage("hdfs://test/checkpoint/")

//    val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(5 * 60 * 1000)
//    val checkpointConfig = env.getCheckpointConfigcheckpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)
//    checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
    //stream table env
    val tableEnv = StreamTableEnvironment.create(env, settings)

    //hive cataLog
    val hiveCatalogName: String = "kafka_hc"
    val defaultDatabasesName: String = "test"
    val hiveConfDir: String = "/alidata1/tmp/hivecf/"

    val cataLog: HiveCatalog = new HiveCatalog(hiveCatalogName, defaultDatabasesName, hiveConfDir)
    tableEnv.registerCatalog("kafka_hc", cataLog)

    tableEnv.useCatalog(hiveCatalogName)

    //获取kafka 流
    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
    tableEnv.executeSql("drop table if exists KafkaSourceTable")

    //ts as to_timestamp(from_unixtime(tid/1000, 'yy-MM-dd HH:mm:ss'))

    val createTableSql: String =
      """
        |create table KafkaSourceTable(
        | vin String,
        | tid String,
        | source String,
        | datas String,
        | ts as substr(concat('20',tid),1,8 )
        | ) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'topic_c',
        | 'properties.bootstrap.servers' = 'ip:9092,ip2:9092,ip3:9092',
        | 'properties.group.id' = 'flink-test-group',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset',
        | 'json.ignore-parse-errors' = 'true',
        | 'scan.startup.mode' = 'group-offsets'
        |)
        """.stripMargin

    
    log.info(s"createTableSql: $createTableSql")
    tableEnv.executeSql(createTableSql)




//    //hvie cataLog
    tableEnv.useCatalog("kafka_hc")
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useDatabase("test")

//    val dropSql = ""

    val hiveSql: String =
      """
        |create table if not exists HiveSinkTable(
        | vin String,
        | tid String,
        | source String,
        | datas String )
        |  partitioned by (`date` String)
        | row format delimited fields terminated by 't'
        |stored as orc
        |tblproperties (
        | 'orc.compress'='SNAPPY',
        | 'partition.time-extractor.timestamp-pattern' = '$date 00:00:00',
        | 'sink.partition-commit.trigger' = 'process-time',
        | 'sink.partition-commit.policy.kind' = 'metastore,success-file'
        |)
        |""".stripMargin

    tableEnv.executeSql(hiveSql)


    //切换方言
    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    val insertSql: String =
      """
        |insert into HiveSinkTable
        |select vin, tid, source, datas, from_unixtime(unix_timestamp(ts,'yyyyMMdd'),'yyyy-MM-dd') as `date`
        |from KafkaSourceTable
        |""".stripMargin
    //select vin, tid, sources, datas, date_format(ts, 'yyyy-MM-dd') as `date`
    log.info(s"insertSql: $insertSql")

    tableEnv.executeSql(insertSql)




//    env.execute("kafka to hive")

  }
}


Hive



tid 上传时间不标准,所以分区不准确。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612683.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号