Dataset
Dataset 底层(InternalRow) Dataframe
通过隐式转换创建 DF Dataset 和 Dataframe 的异同
Dataframe 就是 Dataset[Row]Row 是什么?Dataframe 和 Dataset 之间的相互转换 如何理解 RDD、Dataframe 和 Dataset(总结)
Dataset
Dataset 是一个强类型,并且类型安全的数据容器,并且提供了结构化查询 API 和类似 RDD 一样的命令式 API 。
如下查询写法:
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
// 方式1: 通过对象来处理
dataset.filter(item => item.age > 10).show()
// 方式2: 通过字段来处理
dataset.filter('age > 10).show()
// 方式3: 通过类似SQL的表达式来处理
dataset.filter("age > 10").show()
Dataset 底层(InternalRow)
Dataset 最底层处理的是对象的序列化形式,通过查看 Dataset 生成的物理执行计划,也就是最终处理的RDD,就可以判定 Dataset 底层处理的是什么形式的数据。
val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd
dataset.queryExecution.toRdd 这个 API 可以看到 Dataset 底层执行的 RDD,这个 RDD 中的范型是 InternalRow ,InternalRow 又称为 Catalyst Row ,是 Dataset 底层的数据结构,也就是说,无论 Dataset 的范型是什么,无论是 Dataset[Person] 还是其它的,其最底层进行处理的数据结构都是 InternalRow 。
所以, Dataset 的范型对象在执行之前,需要通过 Encoder 转换为 InternalRow,在输入之前,需要把 InternalRow 通过 Decoder 转换为范型对象。
Dataset 是一个 Spark 组件,其底层还是 RDD 。Dataset 提供了访问对象中某个字段的能力,不用像 RDD 一样每次都要针对整个对象做操作。Dataset 和 RDD 不同,如果想把 Dataset[T] 转为 RDD[T],则需要对 Dataset 底层的 InternalRow 做转换,是一个比较重量级的操作。
Dataframe
Dataframe 是 SparkSQL 中一个表示关系型数据库中 表 的函数式抽象,其作用是让 Spark 处理大规模结构化数据的时候更加容易,一般 Dataframe 可以处理结构化的数据,或者是半结构化的数据,因为这俩类数据中都可以获取到 Schema 信息,也就是说 Dataframe 中有 Schema 信息,可以像操作表一样操作 Dataframe 。
Dataframe 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 Dataframe 结构的 Schema
Dataframe 支持 SQL 中常见的操作, 例如: select, filter, join, group, sort, join 等
通过隐式转换创建 DF
这种方式本质上是使用 SparkSession 中的隐式转换来进行的。
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
// 必须要导入隐式转换
// 注意: spark 在此处不是包, 而是 SparkSession 对象
import spark.implicits._
val peopleDF: Dataframe = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
toDF 方法可以在 RDD 和 Seq 中使用通过集合创建 Dataframe 的时候, 集合中不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建。
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val df1: Dataframe = Seq("nihao", "hello").toDF("text")
df1.show()
val df2: Dataframe = Seq(("a", 1), ("b", 1)).toDF("word", "count")
df2.show()
通过外部集合创建 Dataframe
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
val df = spark.read
.option("header", true)
.csv("dataset/BeijingPM20100101_20151231.csv")
df.show(10)
df.printSchema()
不仅可以从 csv 文件创建 Dataframe, 还可以从 Table, JSON, Parquet 等中创建 Dataframe
总结:
1.Dataframe 是一个类似于关系型数据库表的函数式组件。2.Dataframe 一般处理结构化数据和半结构化数据。3.Dataframe 具有数据对象的 Schema 信息。4.可以使用命令式的 API 操作 Dataframe, 同时也可以使用 SQL 操作 Dataframe。5.Dataframe 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建。
Dataset 和 Dataframe 的异同
Dataframe 就是 Dataset[Row]
相同点:
1.Dataset 中可以使用列来访问数据,Dataframe 也可以2.Dataset 的执行是优化的,Dataframe 也是3.Dataset 具有命令式 API,同时也可以使用 SQL 来访问,Dataframe 也可以使用这俩种不同的方式访问
不同点:
1.Dataframe 表达的含义是一个支持函数式操作的 表,而 Dataset 表达是一个类似 RDD 的东西,Dataset 可以处理任何对象。2.Dataframe 中所存放的是 Row 对象,而 Dataset 中可以存放任何类型对象。
Dataframe 就是 Dataset[Row]Dataset 的范型可以是任意类型
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val df: Dataframe = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
3.Dataframe 的操作方式和 Dataset 是一样的,但是对于强类型操作而言,他们处理的类型不同。
Dataframe 在进行强类型操作的时候,例如 map 算子,其所处理的数据类型永远是 Row但是对于 Dataset 来讲,其中是什么类型,他就处理什么类型
4.Dataframe 只能做到运行时类型检查,Dataset能做到编译和运行时都有类型检查。
1.Dataframe 中存放的数据以 Row 表示,一个 Row 代表一行数据,这和关系型数据库类似2.Dataframe 在进行 map 等操作的时候,Dataframe 不能直接使用 Person 这样的 Scala 对象,所以无法做到编译时检查3.Dataset 表示的具体的某一类对象,例如 Person,所以再进行 map 等操作的时候,传入的是具体的某个 Scala 对象,如果调用错了方法,编译时就会被检查出来
Row 是什么?
Row 对象表示的是一个 行Row 的操作类似于 Scala 中的 Map 数据类型
// 一个对象就是一个对象
val p = People(name = "zhangsan", age = 10)
// 同样一个对象, 还可以通过一个 Row 对象来表示
val row = Row("zhangsan", 10)
// 获取 Row 中的内容
println(row.get(1))
println(row(1))
// 获取时可以指定类型
println(row.getAs[Int](1))
// 同时 Row 也是一个样例类, 可以进行 match
row match {
case Row(name, age) => println(name, age)
}
Dataframe 和 Dataset 之间的相互转换
代码演示:
val spark: SparkSession = new sql.SparkSession.Builder()
.appName("hello")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val df: Dataframe = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
val ds_fdf: Dataset[People] = df.as[People]
val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
val df_fds: Dataframe = ds.toDF()
总结:
1.Dataframe 就是 Dataset, 他们的方式是一样的, 也都支持 API 和 SQL 两种操作方式2.Dataframe 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作3.Dataframe 中的数据表示为 Row, 是一个行的概念
如何理解 RDD、Dataframe 和 Dataset(总结)
数据结构 RDD
RDD(Resilient Distributed Datasets)叫做弹性分布式数据集,是Spark中最基本的数据抽象,源码中是一个抽象类,代表一个不可变、可分区、里面的元素可并行计算的集合。编译时类型安全,但是无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化,还存在较大的GC的性能开销,会频繁的创建和销毁对象。
数据结构 Dataframe
与RDD类似,Dataframe是一个分布式数据容器,不过它更像数据库中的二维表格,除了数据之外,还记录这数据的结构信息(即schema)。Dataframe也是懒执行的,性能上要比RDD高(主要因为执行计划得到了优化)。由于Dataframe每一行的数据结构一样,且存在schema中,Spark通过schema就能读懂数据,因此在通信和IO时只需要序列化和反序列化数据,而结构部分不用。Spark能够以二进制的形式序列化数据到JVM堆以外(off-heap:非堆)的内存,这些内存直接受操作系统管理,也就不再受JVM的限制和GC的困扰了。但是Dataframe不是类型安全的。
数据结构 Dataset
Dataset是Dataframe API的一个扩展,是Spark最新的数据抽象,结合了RDD和Dataframe的优点。Dataframe=Dataset[Row](Row表示表结构信息的类型),Dataframe只知道字段,但是不知道字段类型,而Dataset是强类型的,不仅仅知道字段,而且知道字段类型。样例类CaseClass被用来在Dataset中定义数据的结构信息,样例类中的每个属性名称直接对应到Dataset中的字段名称。Dataset具有类型安全检查,也具有Dataframe的查询优化特性,还支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
RDD、Dataframe和DataSet之间的转换如下,假设有个样例类:
case class Emp(name:String),相互转换:
RDD转换到Dataframe:rdd.toDF(“name”) RDD转换到Dataset:rdd.map(x => Emp(x)).toDS Dataframe转换到Dataset:df.as[Emp] Dataframe转换到RDD:df.rdd Dataset转换到Dataframe:ds.toDF Dataset转换到RDD:ds.rdd
RDD与Dataframe或者Dataset进行操作,都需要引入隐式转换 import spark.implicits._ ,其中的 spark 是 SparkSession 对象的名称!



