背景:最近需要根据parquet文件来解析schema信息,便参考学习了sparksql中infer parquet schema的相关代码
一、infer schema代码入口:
package位置:org.apache.spark.sql.execution.datasources.parquet
入口类:ParquetFileFormat是sparksql中paquert格式的data source,该类继承自FileFormat,类似的类还有OrcFileFormat, AvroFileFormat 等
函数:
override def inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
ParquetUtils.inferSchema(sparkSession, parameters, files)
}
函数中调用了类ParquetUtils中的inferSchema方法,
类中提供了val shouldMergeSchemas = parquetOptions.mergeSchema来觉得infer出的schema是否需要进行merge,可以通过参数spark.sql.hive.convertmetastoreParquet来进行设置
继续往下看,ParquetUtils中通过调用ParquetFileFormat中的函数:
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
mergeSchemasInParallel函数:
def mergeSchemasInParallel(
parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}
SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
}
解析parquet schema的函数便是该方法,readParquetFootersInParallel函数负责读取到parquet的footer,其中readfooter方法便是调用parquet-hadoop方法中的api获取schema信息
private[parquet] def readParquetFootersInParallel(
conf: Configuration,
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
}
}.flatten
}
上面的代码走读为通过parquet reader中的readFooter方法读取parquet文件的schema,
二、Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
一个很重要的逻辑就是将parquet的MessageType转换为Spark SQL中的StructType
def readSchemaFromFooter(
footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
val filemetaData = footer.getParquetmetadata.getFilemetaData
filemetaData
.getKeyValuemetaData
.asScala.toMap
.get(ParquetReadSupport.SPARK_metaDATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(filemetaData.getSchema))
}
上述代码中的convert 方法便是该功能,该函数位于类:
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter
具体代码如下:
def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())
private def convert(parquetSchema: GroupType): StructType = {
val fields = parquetSchema.getFields.asScala.map { field =>
field.getRepetition match {
case OPTIonAL =>
StructField(field.getName, convertField(field), nullable = true)
case REQUIRED =>
StructField(field.getName, convertField(field), nullable = false)
case REPEATED =>
// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
// annotated by `LIST` or `MAP` should be interpreted as a required list of required
// elements where the element type is the type of the field.
val arrayType = ArrayType(convertField(field), containsNull = false)
StructField(field.getName, arrayType, nullable = false)
}
}
StructType(fields)
}
本质上的逻辑便是将Parquet Type 转换为Spark SQL的DataType,具体逻辑不在贴代码,感兴趣的可以自己找到对应代码学习。



