一、前置知识详解
Spark SQL重要是操作Dataframe,Dataframe本身提供了save和load的操作,
Load:可以创建Dataframe,
Save:把Dataframe中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
二、Spark SQL读写数据代码实战
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class SparkSQLLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext = new SQLContext(sc);
Dataframe peopleDF = sqlContext.read().format("json").load("E:\Spark\Sparkinstanll_package\Big_Data_Software\spark-1.6.0-bin-hadoop2.6\examples\src\main\resources\people.json");
//通过mode来指定输出文件的是append。创建新文件来追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\personNames");
}
}
读取过程源码分析如下:
1. read方法返回DataframeReader,用于读取数据。
@Experimental //创建DataframeReader实例,获得了DataframeReader引用 def read: DataframeReader = new DataframeReader(this)
2. 然后再调用DataframeReader类中的format,指出读取文件的格式。
def format(source: String): DataframeReader = {
this.source = source
this
}
3. 通过DtaframeReader中load方法通过路径把传入过来的输入变成Dataframe。
// TODO: Remove this one in Spark 2.0.
def load(path: String): Dataframe = {
option("path", path).load()
}
至此,数据的读取工作就完成了,下面就对Dataframe进行操作。
下面就是写操作!!!
1. 调用Dataframe中select函数进行对列筛选
@scala.annotation.varargs def select(col: String, cols: String*): Dataframe = select((col +: cols).map(Column(_)) : _*)
2. 然后通过write将结果写入到外部存储系统中。
@Experimental def write: DataframeWriter = new DataframeWriter(this)
3. 在保持文件的时候mode指定追加文件的方式
def mode(saveMode: SaveMode): DataframeWriter = {
this.mode = saveMode
this
}
4. 最后,save()方法触发action,将文件输出到指定文件中。
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
三、Spark SQL读写整个流程图如下
四、对于流程中部分函数源码详解
DataframeReader.Load()
1. Load()返回Dataframe类型的数据集合,使用的数据是从默认的路径读取。
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): Dataframe = {
//此时的read就是DataframeReader
read.load(path)
}
2. 追踪load源码进去,源码如下:
在DataframeReader中的方法。Load()通过路径把输入传进来变成一个Dataframe。
// TODO: Remove this one in Spark 2.0.
def load(path: String): Dataframe = {
option("path", path).load()
}
3. 追踪load源码如下:
def load(): Dataframe = {
//对传入的Source进行解析
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap)
Dataframe(sqlContext, LogicalRelation(resolved.relation))
}
DataframeReader.format()
1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
def format(source: String): DataframeReader = {
this.source = source //FileType
this
}
Dataframe.write()
1. 创建DataframeWriter实例
@Experimental def write: DataframeWriter = new DataframeWriter(this) 1
2. 追踪DataframeWriter源码如下:
以Dataframe的方式向外部存储系统中写入数据。
@Experimental
final class DataframeWriter private[sql](df: Dataframe) {
DataframeWriter.mode()
1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
def mode(saveMode: SaveMode): DataframeWriter = {
this.mode = saveMode
this
}
2. 通过模式匹配接收外部参数
def mode(saveMode: String): DataframeWriter = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
}
this
}
DataframeWriter.save()
1. save将结果保存传入的路径。
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
2. 追踪save方法。
def save(): Unit = {
ResolvedDataSource(
df.sqlContext,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
mode,
extraOptions.toMap,
df)
}
3. 其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
defaultValue = Some("org.apache.spark.sql.parquet"),
doc = "The default data source to use in input/output.")
Dataframe.scala中部分函数详解:
1. toDF函数是将RDD转换成Dataframe
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned Dataframe.
def toDF(): Dataframe = this
2. show()方法:将结果显示出来
// scalastyle:off println def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) // scalastyle:on println
追踪showString源码如下:showString中触发action收集数据。
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
val numRows = _numRows.max(0)
val sb = new StringBuilder
val takeResult = take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
val numCols = schema.fieldNames.length
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。



