栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark基础之:Spark SQL介绍

Spark基础之:Spark SQL介绍

Spark基础之:Spark SQL介绍

一.Spark SQL的概述

1、Spark SQL来源2、从代码看Spark SQL的特点3、从代码运行速度看来看Spark SQL 二.Spark SQL数据抽象

Dataframe

1)Dataframe的组成2)Dataframe的代码构建

<1>基于rdd的方式1<2>基于rdd的方式2<3>基于rdd的方式3<4>基于pandas的Dataframe 3)Dataframe读取外部数据

<1>Dataframe读取json文件<2>Dataframe读取csv文件<3>Dataframe读取parquet文件 4)Dataframe数据写出 三.Spark SQL定义UDF函数四.Spark SQL执行流程

一.Spark SQL的概述 1、Spark SQL来源

Hive是目前大数据领域,事实上的数据仓库标准。

Hive与RDBMS的SQL模型比较类似,容易掌握。 Hive的主要缺陷在于它的底层是基于MapReduce的,执行比较慢。

在Spark 0.x版的时候推出了Shark,Shark与Hive是紧密关联的,Shark底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,性能上比Hive提升了很多倍。

在Spark 1.x的时候Shark被淘汰。在2014 年7月1日的Spark Summit 上, Databricks宣布终止对Shark的开发,将重点放到 Spark SQL 上。

Shark终止以后,产生了两个分支:

1). Hive on Spark
hive社区的,源码在hive中

2). Spark SQL(Spark on Hive)
Spark社区,源码在Spark中,支持多种数据源,多种优化技术,扩展性好很多;

Spark SQL的源码在Spark中,而且新增了许多的优化代码,如果追求速度,例如数据分析的时候,可以使用Hive on Spark,如果追求性能,例如生产的定时报表的时候,应该使用Spark SQL。

2、从代码看Spark SQL的特点

我们来对比Spark RDD、Dataframe、SQL代码实现wordcount:

我们可以看到,Spark SQL代码看起来与关系型数据库是一致的,从上图可以看到Spark SQL的特点:

1). 集成
通过Spark SQL或Dataframe API运行Spark 程序,操作更加简单、快速.

从上图可以看到,Spark SQL和Dataframe底层其实就是调用RDD

2). 统一的数据访问
Dataframe 和SQL提供了访问各种数据源的通用方式,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以跨这些数据源连接数据。

3). Hive集成
在现有的数据仓库上运行SQL或HiveQL查询。

4). 标准的连接
服务器模式为业务智能工具提供行业标准的JDBC和ODBC连接。

3、从代码运行速度看来看Spark SQL


从上图我们可以看到:
1). Python操作RDD比Java/Scala慢一倍以上
2). 无论是那种语言操作Dataframe,性能几乎一致

那么为什么Python用RDD这么慢?
为什么用Python写的RDD比Scala慢一倍以上,两种不同的语言的执行引擎,上下文切换、数据传输。

Spark SQL其实底层调用的也是RDD执行,其实中间的执行计划进行了优化,而且是在Spark的优化引擎里面,所以无论是那种语言操作Dataframe,性能几乎一致

二.Spark SQL数据抽象

Spark SQL提供了两个新的抽象,分别是Dataframe 和Dataset;

Dataset是数据的分布式集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDDs的优点(强类型、使用强大lambda函数的能力)以及Spark SQL优化的执行引擎的优点。可以从JVM对象构造数据集,然后使用函数转换(map、flatMap、filter等)操作数据集。数据集API可以在Scala和Java中使用。Python不支持Dataset API。但是由于Python的动态特性,Dataset API的许多优点已经可以使用了(例如,您可以通过名称natural row. columnname访问行字段)。R的情况也是类似的。

Dataframe 是组织成命名列的Dataset。它在概念上相当于关系数据库中的表或R/Python中的数据框架,但在底层有更丰富的优化。数据框架可以从各种各样的数据源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的rdd。Dataframe API可以在Scala、Java、Python和r中使用。在Scala和Java中,Dataframe是由行数据集表示的。在Scala API中,Dataframe只是Dataset[Row]的类型别名。而在Java API中,用户需要使用Dataset来表示Dataframe。

Dataframe

Dataframe的前身是SchemaRDD。Spark1.3更名为Dataframe。不继承RDD,自己实现RDD的大部分功能。与RDD类似,Dataframe也是一个分布式数据集

    Dataframe可以看做分布式Row对象的集合,提供了由列组成的详细模式信息,使其可以得到优化,Dataframe不仅有比RDD更多的算子,还可以进行执行计划的优化

    Dataframe更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema

    Dataframe也支持嵌套数据类型(struct、array和Map)

    Dataframe API提供的是一套高层的关系操作,比函数式RDD API更加优化,门槛低

    Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错。

1)Dataframe的组成

Dataframe是一个二维表的结构,既然是表结构,就有无法绕开的三点:

行列表结构描述

基于这个前提,Dataframe的组成如下:

在结构层面:

StructType对象描述整个Dataframe的表结构。StructField描述一个列的信息。

在数据层面:

row对象记录一行数据。Column对象记录一列数据并包含列的信息。

代码示例:

一个StructField记录:列明,列类型,列是否运行为空,多个StructField组成一个StructType对象。
一个StructType对象可以描述一个Dataframe:有几个列,每个列的名字和类型,每个列是否为空。 2)Dataframe的代码构建

Dataframe对象可以从rdd转换而来,都是分布式的数据集,其实就是转换一下内部的存储结构,转换为二维表的结构。

<1>基于rdd的方式1

调用Spark

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textGile("../data/sql/people.txt").
	map(lambda x: x.split(',')).
	map(lambda x: [x[0], int(x[1])])

#构建DF方式1
df = spark.createDataframe(rdd, schema = ['name', 'age'])

# 打印表结构
df.printSchema()

# 打印20行数据
df.shwow()

df.createTempView("table1") # 给这个表起一个别名
spark.sql("select * from table1 where age< 30").show()

通过SparkSession对象的createDataframe方法来将rdd转换为Dataframe。
这里只传列名称,类型从RDD中进行推断,是否允许为空默认为允许(Ture)。

<2>基于rdd的方式2

通过StructType对象来定义Dataframe的“表结构”转换rdd

# 创建DF 首先创建RDD 将RDD转换为DF
rdd =  sc.textFile("../data/sql/stu_score.txt").
	map(lambda x: x.split(',')).
	map(lambda x: (int(x[0]), x[1], int(x[2])))

# StructTYpe 类
# 这个类可以定义整个Dataframe中的Schema
schema = StructType().
	add("id", IntegerTYpe(), nullable = False).
	add("name", StringType(), nullable = True).
	add("score", IntegerType(), nullable = False)
# 一个add方法定义一个列的信息,如果有3个列,就写3个add
# add方法: 参数1:列名称, 参数2:列类型, 参数3:是否允许为空
df = spark.createDataframe(rdd, schema)
sd.printSchema()
df.show()
<3>基于rdd的方式3

使用RDD的toDF方法转换RDD

# 创建DF 首先创建RDD 将RDD转换为DF
rdd =  sc.textFile("../data/sql/stu_score.txt").
	map(lambda x: x.split(',')).
	map(lambda x: (int(x[0]), x[1], int(x[2])))

# StructTYpe 类
# 这个类可以定义整个Dataframe中的Schema
schema = StructType().
	add("id", IntegerTYpe(), nullable = False).
	add("name", StringType(), nullable = True).
	add("score", IntegerType(), nullable = False)
# 一个add方法定义一个列的信息,如果有3个列,就写3个add
# add方法: 参数1:列名称, 参数2:列类型, 参数3:是否允许为空

# 方式1:只传列名,类型靠推断,是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()

# 方式2:传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
<4>基于pandas的Dataframe

将pandas的Dataframe对象,转变为分布式的SparkSQL Dataframe对象。

# 构建pandas的DF
pdf = pd.Dataframe({
	"id": [1, 2, 3]
	"name": ["张三", "李四", "王五"]
	"age": [11, 12, 13]
})
# 将Pandas的DF对象转换成Spark的DF
df = spark.createDataframe(pdf)
df.printSchema()
df.show()
3)Dataframe读取外部数据

Dataframe可以读取的文件格式有:json文件、csv文件、parquet文件等。下面我着重介绍几个常见的文件格式的读取。

<1>Dataframe读取json文件

读取json数据源
使用format(“json”)读取json数据

代码示例:

df = spark.read.format("json").
	load("../data/sql/people.json")
# JSON类型一般不用写schema,json自带有列名和列的类型
df.printSchema()
df.show()
<2>Dataframe读取csv文件

读取csv数据源
使用format(“csv”)读取json数据

代码示例:

df = spark.read.format("csv").
	option("sep", ";"). # 列分隔符
	option("header", False). # 是否有csv标头
	option("ending", "utf-8"). # 指定列名和类型
	load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()
<3>Dataframe读取parquet文件

读取parquet数据源
使用format(“parquet”)读取json数据

代码示例:

# parquet自带schema,直接load就可以
df = spark.read.format("parquet").
	load("../data/sql/people.parquet")
df.printSchema()
df.show()
4)Dataframe数据写出

SparkSQL统一写出Dataframe数据
统一API语法:

df.write.mode().format().option(K, V).save(PATH)
# mode, 传入模式字符串可选:append 追加, overwrite 覆盖, ignore 忽略,error 重复就报异常(默认的)
# format, 传入格式字符串, 可选: text, csv, json, parquet, orc, avro, jdbc
# 注意:text源只支持单列的df写出
# option设置属性, 如:.option("sep",",")
# save 写出的路径,支持本地文件和HDFS

代码示例:

# write text 写出,只能写出一个单列数据
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).
	write.
	mode("overwrite").
	format("text").
	save("../data/output/sql/text")
	
# write CSV 写出
df.write.mode("overwrite").
	format("csv").
	option("sep", ",").
	option("header", True).
	save("../data/output/sql/csv")

# write json 写出
df.write.mode("overwrite").
	format("json").
	save("../data/output/sql/json")

# write Parquet 写出
df.write.mode("overwrite").
	format("parquet").
	save("../data/output/sql/parquet")

# b不给format,默认parquet写出
df.write.mode("overwite").save("../data/output/sql/parquet_default")
三.Spark SQL定义UDF函数



四.Spark SQL执行流程

参考文章链接:https://blog.csdn.net/u010520724/article/details/116267767

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706835.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号