栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark处理数据落地Hudi同步HIVE(01)

spark处理数据落地Hudi同步HIVE(01)

注意本案是以HDFS离线数据为例
1 spark操作hive 

sparksql读取hive中的数据不需要hive参与 , 读取HDFS中的数据和mysql中的元数据信息即可

Sparksql本身就内置了hive功能

加载hive的数据,本质上是不需要hive参与的,因为hive的表数据就在hdfs中,hive的表定义信息在mysql中

不管数据还是定义,sparksql都可以直接去获取!

步骤:

  1. 要在工程中添加spark-hive的依赖jar
  2. 要在工程中添加mysql连接驱动依赖jar
  3. 下载三个文件 core-site.xml hdfs-site.xml hive-site.xml  到项目的resources下

依赖如下



    
        doitedu-hange
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    hudi
    
        
            org.apache.hudi
            hudi-client
            0.8.0
            pom
        
        
            org.apache.hudi
            hudi-spark-bundle_2.12
            0.8.0
        
        
            org.apache.hudi
            hudi-hadoop-mr-bundle
            
                
                    jackson-databind
                    com.fasterxml.jackson.core
                
                
                    com.fasterxml.jackson.core
                    jackson-annotations
                
            
            0.8.0
        

        
        
            org.apache.spark
            spark-core_2.12
            3.0.0
        
        
        
        
        
        
        
            org.apache.spark
            spark-sql_2.12
            3.0.0
        
        
            org.apache.spark
            spark-hive_2.12
            3.0.0
        
        
            org.apache.spark
            spark-avro_2.12
            3.0.0
        
        
            org.apache.hadoop
            hadoop-client
            2.7.2
        
        
            org.spark-project.hive
            hive-jdbc
            1.2.1.spark2
        

        
            org.apache.spark
            spark-hive_2.12
            3.0.0
        

        
            mysql
            mysql-connector-java
            5.1.48
        

    

创建sparksession时需要调用.enableHiveSupport( )方法

System.setProperty("HADOOP_USER_NAME","root") ;
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables","true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

代码如下

object LoadDadaFromHIve {
  def main(args: Array[String]): Unit = {
    // 获取Sparksession 对象
   val session =  SparkSession.builder()
      .master("local[*]")
      .appName("hive_demo")
      .enableHiveSupport()  // 开启对hive的支持
      .getOrCreate()

    
    session.sql("select * from doit14.emp") .show()  //
    session.stop()
  }

}

注意: 启动mysql的元数据服务和hiveserver2服务 

这一步执行成功以后 , 说明我们的项目环境是可以操作HIVE的 , 后面的就可以写HUDI的时候同步数据到HIVE表中

2 spark写数据到Hudi

添加依赖 , 准备环境 

object OpHive {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //1  获取sparksession对象
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    import org.apache.spark.sql.functions._
   // 2 加载数据  创建DF  
    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType)
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user.txt")
      .withColumn("ts", lit(commitTime))
     //3 将DF 数据写到到hudi中
    df.write.format("hudi")
      .options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .option(TABLE_NAME, "tb_user_f") //hudi中的表
      .mode(Append).
      //hudi保存数据的地址
      save("/hudi/tb_hudi_user_f");
  }
}

查看指定目录中的数据

3 Spark写数据到Hudi同步到Hive
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object OpHive_NoPartition {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.functions._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType()
      .add("id", DataTypes.StringType)
      .add("name", DataTypes.StringType)
      .add("age", DataTypes.IntegerType)
      .add("city", DataTypes.StringType)
      .add("score", DataTypes.DoubleType)
      .add("gender" , DataTypes.StringType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt")
      .withColumn("ts", lit(commitTime))
    df.show
   // 注册hive驱动
 Class.forName("org.apache.hive.jdbc.HiveDriver")
   // 将数据写入到HUDI  同时同步到hive中
    df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
     // .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段
      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .option(TABLE_NAME, "tb_user_d")  //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true)  // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_d") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
     // .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段  指定多个分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
      mode(Append).
      save("/hudi/tb_hudi_user_d");
  }

}

在hive中查看表 

查看hive中表结构 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/334999.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号