栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hudi 操作入门

Hudi 操作入门

1、构建环境

版本:
spark 2.4.5
hudi 0.11.0

> git clone https://github.com/apache/hudi.git && cd hudi
> vim pom.xml
> 配置阿里云maven镜像库
> mvn clean package -DskipTests -DskipITs
2、通过spark-shell快速启动
> ./spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/xxx/cloudera/lib/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.11.0-SNAPSHOT.jar
2.1、插入数据
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/_,_/_/ /_/_   version 2.4.5
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

-- 设置表名
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow

-- 设置基本路径
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow

-- 数据生成器
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@27103726

-- 新增数据,生成一些数据,将其加载到Dataframe中,然后将Dataframe写入Hudi表
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
inserts: java.util.List[String] = [{"ts": 1645671241937, "uuid": "b8799383-31c8-4a4f-9268-5155f1f1f262", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1645725241890, "uuid": "aa974083-54ee-421e-9f99-a309f3d9226a", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1646141930102, "uuid": "37f684f9-9d33-44ac-9bcd-97160aabde34", "rider": "rider-213", "driver": "driver-213", "begin_lat...
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.Dataframe = [begin_lat: double, begin_lon: double ... 8 more fields]

-- mode(Overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trips_cow是否有数据生成
scala> df.write.format("hudi").
     |      |   options(getQuickstartWriteConfigs).
     |      |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |      |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |      |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |      |   option(TABLE_NAME, tableName).
     |      |   mode(Overwrite).
     |      |   save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
22/03/03 10:12:48 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
22/03/03 10:12:48 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
22/03/03 10:12:48 WARN hudi.HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.
22/03/03 10:12:49 WARN metadata.HoodieBackedTablemetadata: metadata table was not found at path file:///tmp/hudi_trips_cow/.hoodie/metadata

查看 /tmp/hudi_trips_cow 路径:

2.2、查询数据
scala> val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "****/*/*")
roAfterDeleteViewDF: org.apache.spark.sql.Dataframe = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]

scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res27: Long = 8
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751152.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号