栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

第2节 测试spark操作hudi 0.9 cdh6.3.2 版本不兼容时

第2节 测试spark操作hudi 0.9 cdh6.3.2 版本不兼容时

spark-shell操作

(1)spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本号需要和spark版本对应,(可以在maven仓库https://mvnrepository.com/查看spark 个版本对应的spark-avro有没有再maven仓),并且使用Hudi编译好的jar包。

 发现spark-avro使用的3.0.0版本scala 是2.12,如果使用的spark 是apache spark3.0.0之后的可以参考第一节编译时使用scala版本取2.12

当前由于是cdh6.3.2,spark版本是2.4.0 使用如下命令启动spark-shell

local 模式 --master local[*] 

[xxx@xxx Hudi]# spark-shell 
--jars packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.9.0.jar 
--packages org.apache.spark:spark-avro_2.11:2.4.4 
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
 设置表名
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
 
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
 
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
 
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
 
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
 
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
 
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
 
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
 
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9
插入数据

新增数据,生成一些数据,将其加载到Dataframe中,然后将Dataframe写入Hudi表

scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").
        options(getQuickstartWriteConfigs).
        option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        option(TABLE_NAME, tableName).
        mode(Overwrite).
        save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

报错

df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.DataSourceUtils$.PARTITIONING_COLUMNS_KEY()Ljava/lang/String;
  at org.apache.hudi.DataSourceWriteOptions$.translateSqlOptions(DataSourceOptions.scala:203)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:158)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)

原因是使用的cdh里面的spark版本比编译使用的spark底,仔细查看报错位置

org.apache.spark.sql.execution.datasources.DataSourceUtils

 

 cdh版本的这个spark版本低,缺少一部分依赖的代码,

仔细对比之后发现 多出来的 spark-2.4.4版本的这个代码只被一个地方依赖到,检查代码

val PARTITIONING_COLUMNS_KEY : java.lang.String = {  }
def encodePartitioningColumns(columns : scala.Seq[scala.Predef.String]) : scala.Predef.String = {  }
def decodePartitioningColumns(str : scala.Predef.String) : scala.Seq[scala.Predef.String] = {  }

 

观察这段代码可能暂时用不到,将里面的if 代码注释掉,重新编译,重复上面方法,发现可以写入数据

 [xxx@xxx~]# cd /tmp/hudi_trips_cow/

[xxx@xxx hudi_trips_cow]# ls

americas asia

查询数据
scala> val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "*****/*/*").
     |   select("uuid","partitionpath").
     |   sort("partitionpath","uuid").
     |   show(100, false)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/308206.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号