Spark内部已经实现了很多常用数据源的适配,对于不支持的自定义的数据源,也提供了相应的接口。最近刚好遇到需要通过Spark读取在HDFS上的自定义文件格式的需求,网上找到的很多资料都以实现 CreatableRelationProvider 接口、继承 baseRelation 类的方式来实现,这种方式数据读取时需要实现以下三个 trait 中的一个:
@InterfaceStability.Stable
trait TableScan {
def buildScan(): RDD[Row]
}
@InterfaceStability.Stable
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
@InterfaceStability.Stable
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
经过简单测试,功能上没有问题,但在测试过程中发现在不带条件情况下(带条件情况下可以自己实现谓词下推)无论提取多少行数据,数据文件总会被全量读取(例如 limit(5),也将查询所有数据,再展示5行,我希望是可以逐行读取处理,只读取5行),导致数据查询耗时长,产生多余的IO消耗,并且由于实际场景中数据量较大,测试还出现了内存溢出的情况 。
根据Spark org.apache.spark.sql.execution.datasources.DataSource 类中 lookupDataSource 函数源码(关键代码如下),加载自定义的数据源只需要将类名定义为 DefaultSource,并实现 DataSourceRegister 接口 ,
val provider2 = s"$provider1.DefaultSource"
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
//其它代码省略
}
而通过以下代码可以看出现自定义数据源除了实现CreatableRelationProvider 以外还有实现 FileFormat 的方式
def writeAndRead(mode: SaveMode, data: LogicalPlan): baseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
case format: FileFormat =>
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
// Replace the schema with that of the Dataframe we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
在 FileFormat 接口中,常用接口函数有以下三个:
//当用户没有主动指定Schema时调用该方法返回,所以默认情况下该方法应该返回文件的schema信息给SPARK使用
def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
protected def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}
inferSchema :当用户没有主动指定Schema时调用该方法返回,所以默认情况下该方法应该返回文件的schema信息给SPARK使用。
prepareWrite:需要构造一个OutputWriterFactory,主要提供了文件的Writer实例(Writer实例需要是org.apache.spark.sql.execution.datasources.OutputWriter的实现类),和文件的后缀定义。
buildReader:返回一个输入参数为PartitionedFile,返回值为Iterator[InternalRow]的函数 , PartitionedFile是SPARK包装后的File类,如果自定义文件支持split,可实现FileFormat的 buildReaderWithPartitionValues ,这里不展开。
总结:spark对于自定义格式的File,提供了FileFormat接口对文件进行读写操作,只需要实现以上三个函数即可简单完成文件的迭代读取,虽然接口简单,但在具体实现中,文件的读写主要实现逻辑需要单独实现, 需要注意多文件的Schema合并,以及不存在的字段等问题的处理。



