栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark通过实现FileFormat方式读取自定义文件格式

spark通过实现FileFormat方式读取自定义文件格式

Spark内部已经实现了很多常用数据源的适配,对于不支持的自定义的数据源,也提供了相应的接口。最近刚好遇到需要通过Spark读取在HDFS上的自定义文件格式的需求,网上找到的很多资料都以实现 CreatableRelationProvider 接口、继承 baseRelation 类的方式来实现,这种方式数据读取时需要实现以下三个 trait 中的一个:

@InterfaceStability.Stable
trait TableScan {
  def buildScan(): RDD[Row]
}

@InterfaceStability.Stable
trait PrunedScan {
  def buildScan(requiredColumns: Array[String]): RDD[Row]
}

@InterfaceStability.Stable
trait PrunedFilteredScan {
  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

经过简单测试,功能上没有问题,但在测试过程中发现在不带条件情况下(带条件情况下可以自己实现谓词下推)无论提取多少行数据,数据文件总会被全量读取(例如 limit(5),也将查询所有数据,再展示5行,我希望是可以逐行读取处理,只读取5行),导致数据查询耗时长,产生多余的IO消耗,并且由于实际场景中数据量较大,测试还出现了内存溢出的情况 。

根据Spark org.apache.spark.sql.execution.datasources.DataSource 类中 lookupDataSource 函数源码(关键代码如下),加载自定义的数据源只需要将类名定义为 DefaultSource,并实现 DataSourceRegister 接口 ,

val provider2 = s"$provider1.DefaultSource"

Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
  case Success(dataSource) =>
    // Found the data source using fully qualified path
    dataSource
    //其它代码省略
}

而通过以下代码可以看出现自定义数据源除了实现CreatableRelationProvider 以外还有实现 FileFormat 的方式

  
  def writeAndRead(mode: SaveMode, data: LogicalPlan): baseRelation = {
    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
      throw new AnalysisException("Cannot save interval data type into external storage.")
    }

    providingClass.newInstance() match {
      case dataSource: CreatableRelationProvider =>
        dataSource.createRelation(
          sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
      case format: FileFormat =>
        sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
        // Replace the schema with that of the Dataframe we just wrote out to avoid re-inferring
        copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
      case _ =>
        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
    }
  }

在 FileFormat 接口中,常用接口函数有以下三个:

	//当用户没有主动指定Schema时调用该方法返回,所以默认情况下该方法应该返回文件的schema信息给SPARK使用
	def inferSchema(
      sparkSession: SparkSession,
      options: Map[String, String],
      files: Seq[FileStatus]): Option[StructType]  
	
	def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory

	protected def buildReader(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
    throw new UnsupportedOperationException(s"buildReader is not supported for $this")

	
  }

inferSchema :当用户没有主动指定Schema时调用该方法返回,所以默认情况下该方法应该返回文件的schema信息给SPARK使用。
prepareWrite:需要构造一个OutputWriterFactory,主要提供了文件的Writer实例(Writer实例需要是org.apache.spark.sql.execution.datasources.OutputWriter的实现类),和文件的后缀定义。
buildReader:返回一个输入参数为PartitionedFile,返回值为Iterator[InternalRow]的函数 , PartitionedFile是SPARK包装后的File类,如果自定义文件支持split,可实现FileFormat的 buildReaderWithPartitionValues ,这里不展开。

总结:spark对于自定义格式的File,提供了FileFormat接口对文件进行读写操作,只需要实现以上三个函数即可简单完成文件的迭代读取,虽然接口简单,但在具体实现中,文件的读写主要实现逻辑需要单独实现, 需要注意多文件的Schema合并,以及不存在的字段等问题的处理。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751380.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号