栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark infer parquet schema

spark infer parquet schema

背景:最近需要根据parquet文件来解析schema信息,便参考学习了sparksql中infer parquet schema的相关代码

一、infer schema代码入口:

package位置:org.apache.spark.sql.execution.datasources.parquet

入口类:ParquetFileFormat是sparksql中paquert格式的data source,该类继承自FileFormat,类似的类还有OrcFileFormat,           AvroFileFormat 等

函数:

override def inferSchema(
     sparkSession: SparkSession,
     parameters: Map[String, String],
     files: Seq[FileStatus]): Option[StructType] = {
    ParquetUtils.inferSchema(sparkSession, parameters, files)
}

函数中调用了类ParquetUtils中的inferSchema方法,

类中提供了val shouldMergeSchemas = parquetOptions.mergeSchema来觉得infer出的schema是否需要进行merge,可以通过参数spark.sql.hive.convertmetastoreParquet来进行设置

继续往下看,ParquetUtils中通过调用ParquetFileFormat中的函数:

ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
mergeSchemasInParallel函数:
def mergeSchemasInParallel(
      parameters: Map[String, String],
      filesToTouch: Seq[FileStatus],
      sparkSession: SparkSession): Option[StructType] = {
    val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
    val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp

    val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
      // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
      val converter = new ParquetToSparkSchemaConverter(
        assumeBinaryIsString = assumeBinaryIsString,
        assumeInt96IsTimestamp = assumeInt96IsTimestamp)

      readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
        .map(ParquetFileFormat.readSchemaFromFooter(_, converter))
    }

    SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
  }

解析parquet schema的函数便是该方法,readParquetFootersInParallel函数负责读取到parquet的footer,其中readfooter方法便是调用parquet-hadoop方法中的api获取schema信息

private[parquet] def readParquetFootersInParallel(
      conf: Configuration,
      partFiles: Seq[FileStatus],
      ignoreCorruptFiles: Boolean): Seq[Footer] = {
    ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile =>
      try {
        // Skips row group information since we only need the schema.
        // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
        // when it can't read the footer.
        Some(new Footer(currentFile.getPath(),
          ParquetFileReader.readFooter(
            conf, currentFile, SKIP_ROW_GROUPS)))
      } catch { case e: RuntimeException =>
        if (ignoreCorruptFiles) {
          logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
          None
        } else {
          throw new IOException(s"Could not read footer for file: $currentFile", e)
        }
      }
    }.flatten
  }

上面的代码走读为通过parquet reader中的readFooter方法读取parquet文件的schema,

二、Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].

     一个很重要的逻辑就是将parquet的MessageType转换为Spark SQL中的StructType

def readSchemaFromFooter(
      footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
    val filemetaData = footer.getParquetmetadata.getFilemetaData
    filemetaData
      .getKeyValuemetaData
      .asScala.toMap
      .get(ParquetReadSupport.SPARK_metaDATA_KEY)
      .flatMap(deserializeSchemaString)
      .getOrElse(converter.convert(filemetaData.getSchema))
  }

上述代码中的convert 方法便是该功能,该函数位于类:

org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter

具体代码如下:

  def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())

  private def convert(parquetSchema: GroupType): StructType = {
    val fields = parquetSchema.getFields.asScala.map { field =>
      field.getRepetition match {
        case OPTIonAL =>
          StructField(field.getName, convertField(field), nullable = true)

        case REQUIRED =>
          StructField(field.getName, convertField(field), nullable = false)

        case REPEATED =>
          // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
          // annotated by `LIST` or `MAP` should be interpreted as a required list of required
          // elements where the element type is the type of the field.
          val arrayType = ArrayType(convertField(field), containsNull = false)
          StructField(field.getName, arrayType, nullable = false)
      }
    }

    StructType(fields)
  }

本质上的逻辑便是将Parquet Type 转换为Spark SQL的DataType,具体逻辑不在贴代码,感兴趣的可以自己找到对应代码学习。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422985.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号