注意本案是以HDFS离线数据为例1 spark操作hive
sparksql读取hive中的数据不需要hive参与 , 读取HDFS中的数据和mysql中的元数据信息即可
Sparksql本身就内置了hive功能
加载hive的数据,本质上是不需要hive参与的,因为hive的表数据就在hdfs中,hive的表定义信息在mysql中
不管数据还是定义,sparksql都可以直接去获取!
步骤:
- 要在工程中添加spark-hive的依赖jar
- 要在工程中添加mysql连接驱动依赖jar
- 下载三个文件 core-site.xml hdfs-site.xml hive-site.xml 到项目的resources下
依赖如下
doitedu-hange org.example 1.0-SNAPSHOT 4.0.0 hudiorg.apache.hudi hudi-client0.8.0 pom org.apache.hudi hudi-spark-bundle_2.120.8.0 org.apache.hudi hudi-hadoop-mr-bundlejackson-databind com.fasterxml.jackson.corecom.fasterxml.jackson.core jackson-annotations0.8.0 org.apache.spark spark-core_2.123.0.0 org.apache.spark spark-sql_2.123.0.0 org.apache.spark spark-hive_2.123.0.0 org.apache.spark spark-avro_2.123.0.0 org.apache.hadoop hadoop-client2.7.2 org.spark-project.hive hive-jdbc1.2.1.spark2 org.apache.spark spark-hive_2.123.0.0 mysql mysql-connector-java5.1.48
创建sparksession时需要调用.enableHiveSupport( )方法
System.setProperty("HADOOP_USER_NAME","root") ;
val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("datanucleus.schema.autoCreateTables","true")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
代码如下
object LoadDadaFromHIve {
def main(args: Array[String]): Unit = {
// 获取Sparksession 对象
val session = SparkSession.builder()
.master("local[*]")
.appName("hive_demo")
.enableHiveSupport() // 开启对hive的支持
.getOrCreate()
session.sql("select * from doit14.emp") .show() //
session.stop()
}
}
注意: 启动mysql的元数据服务和hiveserver2服务
这一步执行成功以后 , 说明我们的项目环境是可以操作HIVE的 , 后面的就可以写HUDI的时候同步数据到HIVE表中
2 spark写数据到Hudi添加依赖 , 准备环境
object OpHive {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root");
val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//1 获取sparksession对象
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
import org.apache.spark.sql.functions._
// 2 加载数据 创建DF
val commitTime = System.currentTimeMillis().toString //生成提交时间
val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType)
val df: Dataframe = sparkSession.read.schema(schema).csv("/user.txt")
.withColumn("ts", lit(commitTime))
//3 将DF 数据写到到hudi中
df.write.format("hudi")
.options(getQuickstartWriteConfigs()).
// 设置表类型 COW 概念在后面会涉及到
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(TABLE_NAME, "tb_user_f") //hudi中的表
.mode(Append).
//hudi保存数据的地址
save("/hudi/tb_hudi_user_f");
}
}
查看指定目录中的数据
3 Spark写数据到Hudi同步到Hivepackage com.doitedu.demo
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}
object OpHive_NoPartition {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root");
val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("datanucleus.schema.autoCreateTables", "true")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
insertData(sparkSession);
}
def insertData(sparkSession: SparkSession) = {
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.functions._
val commitTime = System.currentTimeMillis().toString //生成提交时间
val schema = new StructType()
.add("id", DataTypes.StringType)
.add("name", DataTypes.StringType)
.add("age", DataTypes.IntegerType)
.add("city", DataTypes.StringType)
.add("score", DataTypes.DoubleType)
.add("gender" , DataTypes.StringType)
// 加载数据同时添加一个 时间字段
val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt")
.withColumn("ts", lit(commitTime))
df.show
// 注册hive驱动
Class.forName("org.apache.hive.jdbc.HiveDriver")
// 将数据写入到HUDI 同时同步到hive中
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs()).
// 设置表类型 COW 概念在后面会涉及到
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
// .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(TABLE_NAME, "tb_user_d") //hudi中的表
.option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive
.option("hoodie.datasource.hive_sync.table", "tb_user_d") //hive中的表
.option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
// .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段 指定多个分区字段
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.embed.timeline.server", false)
.option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
mode(Append).
save("/hudi/tb_hudi_user_d");
}
}
在hive中查看表
查看hive中表结构



