栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL Guide——Data Sources

Spark SQL Guide——Data Sources

文章目录

Parquet Files

Partition Discovery(解析分区信息)Schema Merging(模式归并)Hive metastore Parquet table conversion(Hive metastore Parquet表转换)Columnar Encryption(列式加密) Hive Tables

Specifying storage format for Hive tables(指定Hive表的存储格式)Interacting with Different Versions of Hive metastore(与不同版本的Hive metastore互动)

Parquet Files

Parquet是一种列式格式,许多其他数据处理系统都支持这种格式。Spark SQL支持读取和写入Parquet文件,自动保留原始数据的schema。读取Parquet文件时,出于兼容性的原因,所有列都自动转换为可空的。

Partition Discovery(解析分区信息)

表分区是Hive等系统中常用的一种优化方法。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。所有内置的文件源(包括Text/CSV/JSON/ORC/Parquet)都能够自动发现和推断分区信息。例如,我们可以将所有之前使用的人口数据存储到一个分区表中,使用以下目录结构,并添加两列,性别和国家作为分区列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过将/to/table路径传递给SparkSession.read.parquet或SparkSession.read,Spark SQL会自动从路径中提取分区信息。现在返回的Dataframe的模式变成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。 目前支持数字数据类型、日期、时间戳和字符串类型。 有时用户可能不想自动推断分区列的数据类型。 对于这些用例,可以通过 spark.sql.sources.partitionColumnTypeInference.enabled 配置自动类型推断,默认为 true。 当类型推断被禁用时,字符串类型将用于分区列。
从 Spark 1.6.0 开始,解析分区信息默认只查找给定路径下的分区。 对于上面的示例,如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load,则不会将gender视为分区列。 如果用户需要指定解析分区信息的起始路径,可以在数据源选项中设置basePath。 例如,当 path/to/table/gender=male 是数据的路径并且用户将 basePath 设置为 path/to/table/ 时,gender 将是一个分区列。

Schema Merging(模式归并)

与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持schema演化。 用户可以从一个简单的模式开始,然后根据需要逐渐向schema中添加更多列。 这样,用户最终可能会得到多个 Parquet 文件,这些文件具有不同但相互兼容的schema。 Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的schema。
因为模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,所以我们从1.5.0开始默认关闭了它。你可以通过
1.读取Parquet文件时将数据源选项mergeSchema设置为true(如下面的例子所示),或者
2.将全局SQL选项spark.sql.parquet.mergeSchema设置为true。

// This is used to implicitly convert an RDD to a Dataframe.
import spark.implicits._

// Create a simple Dataframe, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another Dataframe in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
Hive metastore Parquet table conversion(Hive metastore Parquet表转换)

当读取Hive metastore Parquet表和写入到非分区的Hive metastore Parquet表时,Spark SQL将尝试使用自己的Parquet支持,而不是Hive SerDe,以获得更好的性能。此行为由spark.sql.hive.convertmetastoreParquet配置控制,默认情况下是开启的。

Hive/Parquet Schema Reconciliation:
从表Schema处理的角度来看,Hive和Parquet之间有两个关键的区别。
1.Hive是大小写敏感的,但Parquet相反。
2.Hive会将所有列视为nullable,但是nullability在parquet里有独特的意义。
因此,在将Hive metastore Parquet表转换为Spark SQL Parquet表时,必须将Hive metastore schema和Parquet schema进行统一。和解规则如下:
1.有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持Parquet侧的数据类型,这样就可以处理到nullability类型了(空值问题)。
2.兼容处理的schema应只包含在Hive元数据里的schema信息,主要体现在以下两个方面:
(1)只出现在Parquet schema的字段会被忽略。
(2)只出现在Hive元数据里的字段将会被视为nullable,并处理到兼容后的schema中。

metadata Refreshing:
Spark SQL缓存Parquet元数据以提高性能。启用Hive metastore Parquet表转换功能后,转换后的数据也会被缓存。如果这些表通过Hive或其他外部工具更新,则需要手动刷新,以保证元数据的一致性。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
Columnar Encryption(列式加密)

从Spark 3.2开始,Apache Parquet 1.12+支持Parquet表的列式加密。

Hive Tables

Spark SQL还支持对Apache Hive中存储的数据进行读写。但是,由于Hive有大量的依赖项,这些依赖项没有包含在默认的Spark分发包中。如果Hive依赖项可以在类路径中找到,Spark会自动加载它们。注意,这些Hive依赖关系也必须出现在所有的工作节点上,因为它们需要访问Hive序列化和反序列化库(SerDes)来访问存储在Hive中的数据。
Hive的配置是将Hive-site.xml、core-site.xml(安全配置)和HDFS-site.xml (HDFS配置)文件放在conf/中。
使用Hive时,必须在Hive支持的情况下实例化SparkSession,包括连接到一个持久化的Hive metastore,支持Hive serdes,以及Hive用户定义函数。
没有Hive部署的用户仍然可以启用Hive支持。在没有hive-site.xml配置的情况下,context会自动在当前目录下创建metastore_db,并创建一个由spark.sql.warehouse.dir配置的目录。默认为Spark应用启动的当前目录下的Spark-warehouse目录。注意hive-site.xml中的hive.metastore.warehouse.dir属性在Spark 2.0.0之后已经弃用。相反,可以使用spark.sql.warehouse.dir指定数据库在仓库中的默认位置。您可能需要向启动Spark应用程序的用户授予写权限。

Specifying storage format for Hive tables(指定Hive表的存储格式)

创建Hive表时,需要定义表从文件系统中读写数据的方式,即输入格式和输出格式。您还需要定义这个表应该如何将数据反序列化为行,或将行序列化为数据,即serde。下面的选项可以用来指定存储格式(serde,输入格式,输出格式),例如CREATE TABLE src(id int) USING hive options (fileFormat ‘parquet’)。默认情况下,我们将以纯文本形式读取表文件。需要注意的是,创建表时还不支持Hive storage handler,可以在Hive侧使用storage handler创建表,然后通过Spark SQL读取。

Interacting with Different Versions of Hive metastore(与不同版本的Hive metastore互动)

Spark SQL支持Hive最重要的一点就是与Hive metastore的交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,一个Spark SQL的二进制构建可以用来查询不同版本的Hive metastore,使用下面描述的配置。需要注意的是,除了用来与metastore对话的Hive版本外,Spark SQL在内部会编译内置的Hive,并使用这些类进行内部执行(serdes, udf, udaf等)。

参考文档:
Spark SQL Guide
Hive SerDe

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722542.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号