栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark sql笔记1

spark sql笔记1

目录

SparkSql的特点

Dataframe

DataSet

SparkSql语法

创建Dataframe的方法

SQL语法

DSL语法

RDD、Dataframe、DataSet联系


Spark sql的前身是Shark,Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。但是Shark继承了Hive的大且复杂的代码使得Shark很难优化和维护。于是后面就分化出了两个框架,一个是SparkSql,一个是hive on spark

与hive和mapreduce的关系类似

出现了SparkSql后,简化了RDD的开发,并为我们提供了两个新的编程模型:Dataframe和DataSet

1、SparkSql的特点

(1)整合了Sql查询和Spark-core编程

(2)统一数据访问,可以用相同的连接方式连接不同的数据源

(3)兼容Hive,可以在已有的数据仓库上直接运行HiveQL

  (4)提供标准的数据连接,通过JDBC或者ODBC来连接

2、Dataframe

在Spark中,Dataframe是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,

Dataframe带有Schema元信息,即Dataframe所表示的二维表数据集的每一列都带有名称和类

型,但底层做了更多的优化。Dataframe可以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表。

 Dataframe与RDD的区别
RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,Dataframe可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。Dataframe和普通的RDD的逻辑框架区别如下所示:



上图直观地体现了Dataframe和RDD的区别。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解 Person类的内部结构。
而右侧的Dataframe却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。Dataframe多了数据的结构信息,即schema。这样看起来就像一张表了,Dataframe还配套了新的操作数据的方法,Dataframe API(如df.select())和SQL(select id, name from xx_table where …)。
此外Dataframe还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作。
RDD是分布式的Java对象的集合。Dataframe是分布式的Row对象的集合。Dataframe除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效
率、减少数据读取以及执行计划的优化。
有了Dataframe这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。
不仅如此,通过Dataframe API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

3、DataSet

DataSet是分布式的数据集合。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作。

>DataSet是Dataframe API的一个扩展,是SparkSQL最新的数据抽象;

>用户友好的API风格,既有安全检查,也有Dataframe的查询优化特性;

>用样例类对DataSet定义数据的结构信息,样例类中的每个属性名称直接映射大宋DataSet中的字段名称;

>DataSet是强类型的,比如可以有DataSet[Car],DataSet[Person]

>Dataframe是DataSet的特例,Dataframe=DataSet[Row],可以通过as方法转换成DataSet,Row是一个类型,和上面的Car,Person类型一样,所有的表结构信息都由Row来表示,获取数据需要指定顺序 

SparkSql语法

我们首先进入spark-shell

spark已经为我们准备好了环境,写spark core的sc(spark context),写spark sql的spark (spark session)

 spark提供了相当多的读取文件格式

文件目录是hdfs上的, 首先来看一看读取csv格式的数据,但是发现一个问题,他没有列名

(Dataframe.show()展示前20条数据)

val df=spark.read.csv("/input/students.txt")

 所以我们需要手动添加列名

val stuDF =spark.read .format("csv").option("sep", ",") .schema("id Int,name String,age Int,gender String,clazz String") .load("/input/students.txt")

 如果是直接读取json格式的文件,会自带列名

但是这上面好像和sql不太沾边啊,我们想要写sql,需要先根据Dataframe创建一个临时视图,然后根据视图查询(要看查询结果的一定要加show()方法)

stuDF.createTempView("stu_view")

spark.sql("select age from stu_tb").show()

 

创建Dataframe的方法

1、从spark数据源创建

查看通过spark读取文件数据源 : spark.read.文件格式(”文件路径“)

如果是从内存中获取数据,spark可以知道数据类型是什么。如果是数字,默认作为Int处理,但是从文件中读取的数字,不能确定是什么类型,用bigint接收,可以和Long类型转换,但是不能和Int转换

 2、从一个存在的 RDD进行转换

RDD转换成Dataframe

首先需要导入隐式转换 import 

Dataframe相比RDD还多了一个结构,所以定义的时候需要指定结构

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))

val df= rdd.toDF("id")

df.show()

Dataframe直接通过rdd方法,就可以转换成RDD,舍弃自身的结构

 

 

3、从Hive中的table查询返回

SQL语法

1、读取json文件创建Dataframe

 查看通过spark读取文件数据源 : spark.read.文件格式(”文件路径“)

 2、对Dataframe创建临时视图

stuDF2.createOrReplaceGlobalTempView("stu_view")

 如果相同的视图创建多了,为了解决重复问题使用上面的方法更好

3、通过SQL查询全表

spark.sql("select * from stu_view").show()

4、全局范围 

stuDF2.createOrReplaceTempView("stu_view")

spark.newSession.sql("select * from stu_view").show()

我创建新连接之后,无法查询到这张表 

 首先创建Global视图,在下面的查询语句中,也需要指定global.temp

stuDF2.createOrReplaceGlobalTempView("stu_view")

spark.newSession.sql("select * from global_temp.stu_view").show

确实可以查询到 

 

 

DSL语法

Dataframe提供了DSL去管理结果化的数据,Scala,java,Python和R中使用DSL,使用DSL就不必创建临时视图了

1、创建一个Dataframe

val stuDF2=spark.read.json("/input/students.json")

2、查看Dataframe的结构 

stuDF2.printSchema

3、查看单独列的数据

stuDF2.select("name").show()

 4、查看"name"数据以及”age+1“数据

涉及运算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名

当然平时也是可以直接加在字段前加‘$’使用,'$'表示引用这个数据

                    

 

 

stuDF2.select('name,'age+1).show()

 

5、filter过滤数据 

 6、grouBy分组,和sql中的一样

7、where过滤数据

使用where的时候需要加上'$'

 

8、agg聚合

stuDF2.groupBy($"clazz",$"gender").agg(count($"gender"))show

 

 

DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息

创建DataSet

case class Student(id:Long,name:String,age:Long,gender:String,clazz:String)

val list=List(Student(1500100001,"施笑槐",22,"女","文科六班"),Student(1500100002,"吕金鹏",24,"男","文科六班"))

val ds=list.toDS

ds.show

 DataSet通过rdd方法直接转换成RDD

val rdd=ds.rdd

Dataframe通过as指定类型,可以转换成DataSet,通常是指定一个样例类

val stuDF=list.toDF

case class Student(id:Long,name:String,age:Long,gender:String,clazz:String)

val ds=stuDF.as[Student]

DataSet通过toDF方法可以转换成DataSet

val df=ds.toDF

如果RDD中的数据通过样例类创建,因为已经有了需要的结构,就可以直接转换成DataSet

val list=List(Student(1500100001,"施笑槐",22,"女","文科六班"),Student(1500100002,"吕金鹏",24,"男","文科六班"))

val ds=list.toDS

ds.show

转换关系图: 

 ​​​​​​​

 

 

 RDD、Dataframe、DataSet联系

相同点

1、RDD、Dataframe、DataSet都是spark平台下的分布式弹性数据集

2、都是懒执行机制,遇到action算子才会开始计算

3、相同的函数很多,如fliter(继承RDD中的方法)

4、Dataframe和DataSet使用需要导入隐式转换(import spark.implicits._)

1)RDD

优点:

编译时类型安全 

编译时就能检查出类型错误

面向对象的编程风格 

直接通过类名点的方式来操作数据

缺点:

序列化和反序列化的性能开销 

无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。

GC的性能开销,频繁的创建和销毁对象, 势必会增加GC

2)Dataframe

Dataframe引入了schema和off-heap

schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。

3)DataSet

DataSet结合了RDD和Dataframe的优点,并带来的一个新的概念Encoder。

当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761609.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号