栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 数据库 > MySQL > MsSql

Spark SQL数据加载和保存实例讲解

MsSql 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark SQL数据加载和保存实例讲解

一、前置知识详解
Spark SQL重要是操作Dataframe,Dataframe本身提供了save和load的操作,
Load:可以创建Dataframe,
Save:把Dataframe中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二、Spark SQL读写数据代码实战

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparkSQLLoadSaveOps {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
  JavaSparkContext sc = new JavaSparkContext(conf);
  SQLContext = new SQLContext(sc);
  
  Dataframe peopleDF = sqlContext.read().format("json").load("E:\Spark\Sparkinstanll_package\Big_Data_Software\spark-1.6.0-bin-hadoop2.6\examples\src\main\resources\people.json");

  
  //通过mode来指定输出文件的是append。创建新文件来追加文件
 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\personNames");
 }
}

读取过程源码分析如下:
1. read方法返回DataframeReader,用于读取数据。


@Experimental
//创建DataframeReader实例,获得了DataframeReader引用
def read: DataframeReader = new DataframeReader(this)

2.  然后再调用DataframeReader类中的format,指出读取文件的格式。


def format(source: String): DataframeReader = {
 this.source = source
 this
}

3.  通过DtaframeReader中load方法通过路径把传入过来的输入变成Dataframe。


// TODO: Remove this one in Spark 2.0.
def load(path: String): Dataframe = {
 option("path", path).load()
}

至此,数据的读取工作就完成了,下面就对Dataframe进行操作。
下面就是写操作!!!

1. 调用Dataframe中select函数进行对列筛选


@scala.annotation.varargs
def select(col: String, cols: String*): Dataframe = select((col +: cols).map(Column(_)) : _*)

2.  然后通过write将结果写入到外部存储系统中。


@Experimental
def write: DataframeWriter = new DataframeWriter(this)

3.   在保持文件的时候mode指定追加文件的方式


def mode(saveMode: SaveMode): DataframeWriter = {
 this.mode = saveMode
 this
}

4.   最后,save()方法触发action,将文件输出到指定文件中。


def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

三、Spark SQL读写整个流程图如下

四、对于流程中部分函数源码详解

DataframeReader.Load()

1. Load()返回Dataframe类型的数据集合,使用的数据是从默认的路径读取。


@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): Dataframe = {
//此时的read就是DataframeReader
 read.load(path)
}

2.  追踪load源码进去,源码如下:
在DataframeReader中的方法。Load()通过路径把输入传进来变成一个Dataframe。


// TODO: Remove this one in Spark 2.0.
def load(path: String): Dataframe = {
 option("path", path).load()
}

3.  追踪load源码如下:


def load(): Dataframe = {
//对传入的Source进行解析
 val resolved = ResolvedDataSource(
  sqlContext,
  userSpecifiedSchema = userSpecifiedSchema,
  partitionColumns = Array.empty[String],
  provider = source,
  options = extraOptions.toMap)
 Dataframe(sqlContext, LogicalRelation(resolved.relation))
}

DataframeReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.


def format(source: String): DataframeReader = {
 this.source = source //FileType
 this
}

Dataframe.write()

1. 创建DataframeWriter实例


@Experimental
def write: DataframeWriter = new DataframeWriter(this)
1

2.  追踪DataframeWriter源码如下:
以Dataframe的方式向外部存储系统中写入数据。


@Experimental
final class DataframeWriter private[sql](df: Dataframe) {

DataframeWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。


def mode(saveMode: SaveMode): DataframeWriter = {
 this.mode = saveMode
 this
}

2.  通过模式匹配接收外部参数


def mode(saveMode: String): DataframeWriter = {
 this.mode = saveMode.toLowerCase match {
  case "overwrite" => SaveMode.Overwrite
  case "append" => SaveMode.Append
  case "ignore" => SaveMode.Ignore
  case "error" | "default" => SaveMode.ErrorIfExists
  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
 }
 this
}

DataframeWriter.save()

1. save将结果保存传入的路径。


def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

2.  追踪save方法。


def save(): Unit = {
 ResolvedDataSource(
  df.sqlContext,
  source,
  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
  mode,
  extraOptions.toMap,
  df)
}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
 defaultValue = Some("org.apache.spark.sql.parquet"),
 doc = "The default data source to use in input/output.")

Dataframe.scala中部分函数详解:

1. toDF函数是将RDD转换成Dataframe


// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned Dataframe.
def toDF(): Dataframe = this

2.  show()方法:将结果显示出来


// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追踪showString源码如下:showString中触发action收集数据。


private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
 val numRows = _numRows.max(0)
 val sb = new StringBuilder
 val takeResult = take(numRows + 1)
 val hasMoreData = takeResult.length > numRows
 val data = takeResult.take(numRows)
 val numCols = schema.fieldNames.length

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/169081.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号