栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Spark重点难点】SparkSQL YYDS(上)!

【Spark重点难点】SparkSQL YYDS(上)!

‍本文已经加入「大数据成神之路PDF版」中提供下载,你可以关注公众号,后台回复:「PDF」即可获取。

更多PDF下载可以参考:《重磅,大数据成神之路PDF可以分类下载啦!》

Spark重点难点系列:

  • 《【Spark重点难点01】你从未深入理解的RDD和关键角色》

  • 《【Spark重点难点02】你以为的Shuffle和真正的Shuffle》

  • 《【Spark重点难点03】你的数据存在哪了?》

  • 《【Spark重点难点04】你的代码跑起来谁说了算?(内存管理)》

Dataframe来源

Spark 社区在 1.3 版本发布了 Dataframe。那么,相比 RDD,Dataframe 到底有何不同呢?

Dataframe被称为SchemaRDD。Dataframe使Spark具备了处理大规模结构化数据的能力。在Spark中,Dataframe是一种以RDD为基础的分布式数据集,因此Dataframe可以完成RDD的绝大多数功能,在开发使用时,也可以调用方法将RDD和Dataframe进行相互转换。

在开发API方面,RDD算子多采用高阶函数,高阶函数的优势在于表达能力强,它允许开发者灵活地设计并实现业务逻辑。而 Dataframe的表达能力却很弱,它定义了一套DSL算子(Domain Specific Language)。

注意:所谓的高阶函数指的是,指的是形参为函数的函数,或是返回类型为函数的函数。

> 例如:我们在WordCount程序中调用flatMap算子:
lineRDD.flatMap(line => line.split(" "))
flatMap的入参其实是一个函数。

这里需要大家注意:是不是Dataframe表达能力弱就意味着Dataframe比RDD弱呢?

恰恰相反,因为Dataframe的算子大多数都是计算逻辑确定的,Spark就可以根据基于启发式的规则或策略甚至动态运行时的信息优化Dataframe的计算过程。

那么负责Dataframe的算子优化是谁来做的呢?正是SparkSQL。

Spark Core和Spark SQL的关系

我们可以用一句话描述这个关系: Spark SQL正是在Spark Core的执行引擎基础上针对结构化数据处理进行优化和改进。

上图揭示了Spark Core体系和Spark SQL体系的关系。在上图中,Spark Core作为整个Spark系统的底层执行引擎。负责了所有的任务调度、数据存储、Shuffle等核心能力。

而Spark SQL正是基于如此强大的Spark Core底层能力,形成一套独立的优化引擎系统。

简单的说,Spark SQL的计算任务通过SQL的形式最终转换成了RDD的计算。Spark SQL会对代码事先进行优化。

Dataframe的创建方式

Spark 本身支持种类丰富的数据源与数据格式,Dataframe的创建方式更是多种多样。

这里我们列举三类最常用的Spark Dataframe的创建方式。

createDataframe & toDF createDataframe方法

在SqlContext中使用createDataframe也可以创建Dataframe。数据可以来源于RDD或者自己创建的数组。

import org.apache.spark.sql.types._
val schema = StructType(List(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false),
      StructField("birthday", DateType, nullable = false)
    ))

val rdd = spark.sparkContext.parallelize(Seq(
      Row("小明", 18, java.sql.Date.valueOf("1990-01-01")),
      Row("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
    ))
val df = spark.createDataframe(rdd, schema)
df.show()

createDataframe 方法有两个参数,第一个参数是RDD,第二个参数就是Schema信息。createDataframe需要的RDD的类型必须是 RDD[Row],其中的 Row 是 org.apache.spark.sql.Row,因此,对于类型为 RDD[(String, Int)]的 rdd,我们需要把它转换为RDD[Row]。

df.show()函数可以将数据进行输出:

+--------------+-------------+-----------+
|name          |age          |birthday   |
+--------------+-------------+-----------+
|小明           |           18| 1990-01-01|
|小芳           |           20| 1999-02-01|
+--------------+-------------+-----------+
toDF方法

我们可以通过导入spark.implicits, 然后通过在 RDD 之上调用 toDF 就能轻松创建 Dataframe。只要这些数据的内容能指定数据类型即可。

import spark.implicits._
val df = Seq(
    ("小明", 18, java.sql.Date.valueOf("1990-01-01")),
    ("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
  ).toDF("name", "age", "birthday")
  
    df.show()

打印出来的结果为:

+--------------+-------------+-----------+
|name          |age          |birthday   |
+--------------+-------------+-----------+
|小明           |           18| 1990-01-01|
|小芳           |           20| 1999-02-01|
+--------------+-------------+-----------+

同样,我们可以将一个RDD转化为df:

val rdd = spark.sparkContext.parallelize(List(1,2,3,4,5))
val df = rdd.map(x=>(x,x^2)).toDF("a","b")
df.show()
通过文件系统创建Dataframe

Spark支持非常多的文件格式,例如CSV、JSON、ORC、Parquet等。支持的文件列表你可以参考这里:

https://docs.databricks.com/data/data-sources/index.html

我们以CSV文件举例,假设我们的文件数据为:

小明,18
小芳,20
val spark = SparkSession.builder()
      .appName("csv reader")
      .master("local")
      .getOrCreate()
 
    val result = spark.read.format("csv")
      .option("delimiter", ",")
      .option("header", "true")
      .option("nullValue", "\N")
      .option("inferSchema", "true")
      .load("path/demo.csv")
 
    result.show()
    result.printSchema()

当然,不同的文件格式有非常多的可选项,你可以参考上面给出的官网连接。

通过其他数据源创建Dataframe

我们可以通过指定连接参数例如数据库地址、用户名、密码等连接其他数据源。我们以MySQL为例:

val url = "jdbc:mysql://localhost:3306/test"
val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", "test")
          .option("user", "admin")
          .option("password", "admin")
          .load()
df.show()
+---+----+----+
| id|user|age|
+---+----+----+
|  1| 小明| 18|
|  2| 小芳| 20|
+---+----+----+
常用语法和算子

Spark SQL支持的算子列表非常非常多。

你可以在这里看到所有的算子列表:

https://spark.apache.org/docs/3.2.0/api/sql/index.html

我们举几个最常用的语法演示给大家看。

  • 单行查询

var userDF = List((1, "张三", true, 18, 15000, 1))
 .toDF("id", "name", "sex", "age", "salary", "dept")
userDF.createTempView("t_employee")
val sql = "select * from t_employee where name = '张三'"
spark.sql(sql).show()
  • 分组查询

var userDF= List( (1,"张三",true,18,15000,1),
 (2,"李四",false,18,12000,1),
 (3,"王五",false,18,16000,2)
) .toDF("id","name","sex","age","salary","dept")

//构建视图
userDF.createTempView("t_employee")
val sql=
 """
   |select dept ,avg(salary) as avg_slalary from t_employee
   |group by dept order by avg_slalary desc
 """.stripMargin
spark.sql(sql).show()
+----+-----------+
|dept|avg_slalary|
+----+-----------+
|   2|    16000.0|
|   1|    13500.0|
+----+-----------+
  • 开窗函数

// 开窗函数
var df=List(
 (1,"zs",true,1,15000),
 (2,"ls",false,2,18000),
 (3,"ww",false,2,14000),
 (4,"zl",false,1,18000),
 (5,"win7",false,1,16000)
).toDF("id","name","sex","dept","salary")
df.createTempView("t_employee")

val sql=
 """
   |select id,name,salary,dept,
   |count(id) over(partition by dept order by salary desc) as rank,
   |(count(id) over(partition by dept order by salary desc rows between current row and unbounded following) - 1) as low_than_me,
   |avg(salary) over(partition by dept rows between unbounded preceding and unbounded following) as avg_salary,
   |avg(salary) over() as all_avg_salary 
   |from t_employee t1 order by dept desc
 """.stripMargin
 
spark.sql(sql).show()

你可以参考这篇博客,这个博客中的例子非常多:

https://mask0407.blog.csdn.net/article/details/106716575

总结

本章我们讲解了Spark SQL的来源,Spark Dataframe创建的方式以及常用的算子。下篇我们将讲解Spark SQL中的Catalyst优化器和Tungsten,以及Spark SQL的Join策略选择。


《大数据成神之路》正在全面PDF化,你只需要在后台回复「PDF」就可以看到阿里云盘下载链接了!

目前我把这些文章按照体系全部整理好了。现在你可以方便的进行查找:

电子版把他们分类做成了下面这个样子,并且放在了阿里云盘提供下载。我们点开一个文件夹后:你只需要在后台回复「PDF」就可以看到阿里云盘下载链接了!

Hi,我是王知无,一个大数据领域的原创作者。 

放心关注我,获取更多行业的一手消息。

八千里路云和月 | 从零到大数据专家学习路径指南

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」‍

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/670999.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号