(1)spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本号需要和spark版本对应,(可以在maven仓库https://mvnrepository.com/查看spark 个版本对应的spark-avro有没有再maven仓),并且使用Hudi编译好的jar包。
发现spark-avro使用的3.0.0版本scala 是2.12,如果使用的spark 是apache spark3.0.0之后的可以参考第一节编译时使用scala版本取2.12
当前由于是cdh6.3.2,spark版本是2.4.0 使用如下命令启动spark-shell
local 模式 --master local[*]
[xxx@xxx Hudi]# spark-shell --jars packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.9.0.jar --packages org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'设置表名
scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> val tableName = "hudi_trips_cow" tableName: String = hudi_trips_cow scala> val basePath = "file:///tmp/hudi_trips_cow" basePath: String = file:///tmp/hudi_trips_cow scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9插入数据
新增数据,生成一些数据,将其加载到Dataframe中,然后将Dataframe写入Hudi表
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。
报错
df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.DataSourceUtils$.PARTITIONING_COLUMNS_KEY()Ljava/lang/String;
at org.apache.hudi.DataSourceWriteOptions$.translateSqlOptions(DataSourceOptions.scala:203)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:158)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
原因是使用的cdh里面的spark版本比编译使用的spark底,仔细查看报错位置
org.apache.spark.sql.execution.datasources.DataSourceUtils
cdh版本的这个spark版本低,缺少一部分依赖的代码,
仔细对比之后发现 多出来的 spark-2.4.4版本的这个代码只被一个地方依赖到,检查代码
val PARTITIONING_COLUMNS_KEY : java.lang.String = { }
def encodePartitioningColumns(columns : scala.Seq[scala.Predef.String]) : scala.Predef.String = { }
def decodePartitioningColumns(str : scala.Predef.String) : scala.Seq[scala.Predef.String] = { }
观察这段代码可能暂时用不到,将里面的if 代码注释掉,重新编译,重复上面方法,发现可以写入数据
查询数据[xxx@xxx~]# cd /tmp/hudi_trips_cow/
[xxx@xxx hudi_trips_cow]# ls
americas asia
scala> val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "*****/*/*").
| select("uuid","partitionpath").
| sort("partitionpath","uuid").
| show(100, false)


