栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Apache Hudi 在 AWS Glue 中的实践

Apache Hudi 在 AWS Glue 中的实践

文章目录

AWS Glue 实践

Glue Job 配置存储为非分区表

Python CodeAWS Glue CatalogQuery in AthenaFiles in S3 Bucket 存储为分区表

Python CodeAWS Glue CatalogQuery in AthenaFiles in S3 Bucket FAQ

ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystemIllegalArgumentException: Partition path default is not in the form yyyy/mm/dd 参考

AWS Glue 实践 Glue Job 配置

ps:IAM的权限配置请自行摸索,权限上基本是缺啥补啥,如果只是作为测试,直接给所有的权限(action和resource都是*)就可以一劳永逸了

Job Pramater

KeyValue
–confspark.serializer=org.apache.spark.serializer.KryoSerializer
–enable-glue-datacatalog

Dependent jars path

s3://gavin-test2/dependency_jars/hudi/spark-avro_2.11-2.4.3.jar,s3://gavin-test2/dependency_jars/hudi/hudi-spark-bundle_2.11-0.8.0.jar

jar 的下载路径:

Jar包下载链接
hudi-spark-bundle_2.11-0.8.0.jarhttps://search.maven.org/remotecontent?filepath=org/apache/hudi/hudi-spark-bundle_2.11/0.8.0/hudi-spark-bundle_2.11-0.8.0.jar
spark-avro_2.11-2.4.3.jarhttps://search.maven.org/remotecontent?filepath=org/apache/spark/spark-avro_2.11/2.4.3/spark-avro_2.11-2.4.3.jar
存储为非分区表 Python Code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *


args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

basePath = 's3://gavin-test2/tables/hudi/table1/'
table_name = 'table1'
database = 'default'
data = [('Alice', 1, '2022/02/28'), ('Jhone', 2, '2022/03/01')]
rdd = sc.parallelize(data)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("partitioin_path", StringType(), True),
    ]
)
src_df = spark.createDataframe(rdd, schema)

hudi_options = {
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.operation': 'insert',
            'hoodie.datasource.write.recordkey.field': 'name',
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.hive_sync.enable': 'true',
            'hoodie.datasource.hive_sync.database': 'default',
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.write.partitionpath.field': '',
            'hoodie.datasource.hive_sync.partition_fields': '',
            'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor'
        }

src_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
AWS Glue Catalog

Query in Athena

_hoodie_commit_time_hoodie_commit_seqno_hoodie_record_key_hoodie_partition_path_hoodie_file_namenameagepartitioin_path
2022030708083620220307080836_0_1Jhone8965ef34-4048-4420-8e69-562a478c3989-0_0-13-481_20220307080836.parquetJhone22022/03/01
2022030708083620220307080836_0_2Alice8965ef34-4048-4420-8e69-562a478c3989-0_0-13-481_20220307080836.parquetAlice12022/02/28
Files in S3 Bucket

存储为分区表

分区表可以选择时间字段作为分区字段,也可以选择非时间字段作为分区字段,本文使用形如「yyyy/mm/dd」的时间字符串作为分区字段;

Python Code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *


args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

basePath = 's3://gavin-test2/tables/hudi/table2/'
table_name = 'table2'
database = 'default'
data = [('Alice', 1, '2022/02/28'), ('Jhone', 2, '2022/03/01')]
rdd = sc.parallelize(data)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("partitioin_path", StringType(), True),
    ]
)
src_df = spark.createDataframe(rdd, schema)

hudi_options = {
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.operation': 'insert',
            'hoodie.datasource.write.recordkey.field': 'name',
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.hive_sync.enable': 'true',
            'hoodie.datasource.hive_sync.database': 'default',
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.write.partitionpath.field': 'partitioin_path',
            'hoodie.datasource.hive_sync.partition_fields': 'partitioin_path',
            'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.SimpleKeyGenerator',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor'
        }

src_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
AWS Glue Catalog

Query in Athena

_hoodie_commit_time_hoodie_commit_seqno_hoodie_record_key_hoodie_partition_path_hoodie_file_namenameagepartitioin_path
2022030708413420220307084134_0_1Jhone2022/03/01afb8d8e3-5f3e-4420-b390-1eeb20a59165-0_0-8-322_20220307084134.parquetJhone22022-03-01
2022030708413420220307084134_1_1Alice2022/02/28351c3c19-3c69-4661-8051-11a90c031112-0_1-8-323_20220307084134.parquetAlice12022-02-28
Files in S3 Bucket

FAQ ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem

Error Info

在Hudi中设置了:DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY -> “false”,具体错误为:

java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem

这是由于Hiv3/Spark3移除了对于calcite包的依赖引起的

Solution:

我偷了个懒,将spark 的版本降到2.X

IllegalArgumentException: Partition path default is not in the form yyyy/mm/dd

Error Info

Caused by: java.lang.IllegalArgumentException: Partition path default is not in the form yyyy/mm/dd 
	at org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor.extractPartitionValuesInPath(SlashEncodedDayPartitionValueExtractor.java:55)
	at org.apache.hudi.hive.HoodieHiveClient.getPartitionEvents(HoodieHiveClient.java:220)
	at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:221)
	... 42 more

Solution

由于配置'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor', 而org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor 中要求时间格式必须是这个类型

org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor#extractPartitionValuesInPath

参考

[1] Hudi 实践 | 在 AWS Glue 中使用 Apache Hudi: https://jishuin.proginn.com/p/763bfbd56de6

[2] 详解Apache Hudi如何配置各种类型分区: https://www.cnblogs.com/leesf456/p/13521694.html

[3] EMR + Hudi报ClassNotFoundException: RelDataTypeSystem错误的解决方法: https://blog.csdn.net/bluishglc/article/details/117441071

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758667.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号