版本:
spark 2.4.5
hudi 0.11.0
> git clone https://github.com/apache/hudi.git && cd hudi > vim pom.xml > 配置阿里云maven镜像库 > mvn clean package -DskipTests -DskipITs2、通过spark-shell快速启动
> ./spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/xxx/cloudera/lib/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.11.0-SNAPSHOT.jar2.1、插入数据
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
-- 设置表名
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
-- 设置基本路径
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
-- 数据生成器
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@27103726
-- 新增数据,生成一些数据,将其加载到Dataframe中,然后将Dataframe写入Hudi表
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
inserts: java.util.List[String] = [{"ts": 1645671241937, "uuid": "b8799383-31c8-4a4f-9268-5155f1f1f262", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1645725241890, "uuid": "aa974083-54ee-421e-9f99-a309f3d9226a", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1646141930102, "uuid": "37f684f9-9d33-44ac-9bcd-97160aabde34", "rider": "rider-213", "driver": "driver-213", "begin_lat...
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.Dataframe = [begin_lat: double, begin_lon: double ... 8 more fields]
-- mode(Overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trips_cow是否有数据生成
scala> df.write.format("hudi").
| | options(getQuickstartWriteConfigs).
| | option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| | option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| | option(TABLE_NAME, tableName).
| | mode(Overwrite).
| | save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
22/03/03 10:12:48 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
22/03/03 10:12:48 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
22/03/03 10:12:48 WARN hudi.HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.
22/03/03 10:12:49 WARN metadata.HoodieBackedTablemetadata: metadata table was not found at path file:///tmp/hudi_trips_cow/.hoodie/metadata
查看 /tmp/hudi_trips_cow 路径:
scala> val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "****/*/*")
roAfterDeleteViewDF: org.apache.spark.sql.Dataframe = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]
scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res27: Long = 8



