小谈每天忙忙碌碌却不知道自己干了什么
最近有些力不从心,学习没有精力,驾考预约一直吊我。哭唧唧
正题上篇介绍了Data frame是Spark SQL的核心,本篇来介绍两种方式进行Spark SQL操作
使用Spark SQL进行数据分析,可以有两种选择,第一个就是DSL语言,第二个就是SQL语言。当然,也可以使用Hive SQL。
使用DSL进行操作
首先创建Spark Session对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
使用Data frame的API进行分析
//读取Json文件,返回一个Data frame对象
val frame = sparkSession.read.json("date/people.json")
//以表格的形式查看people信息
frame.show()
来看一下表格的样式
通过读取json文件创建Data frame对象,Data frame提供了灵活,并且强大的自带优化的API,例如select,where,order by,group by ,limit,union这样的算子操作,Data frame将SQL select语句个各个组成部分封装成同名API,可以让我们SQL Boy更加迅速的熟悉Spark SQL。不需要进行RDD也能进行数据分析。
下面通过几个实例来使用Data frame的Api
- 以树格式输出Data frame对象的结构信息
frame.printSchema()
可以看到,root根节点之后,下面的字段都是json里面的字段,也是以表结构输出来的那些字段。同时也标注了各个字段的类型。
2.在SQL里面,最常写的句子就是Select,大部分都是查询,可以说几乎都是查询。在DSL里面也有select,上面说过了,Data frame 的Api中也是有相对应的SQL 关键字。下面通过select展示一下
在SQL里面使用查询语句
select name from 表
DSL中是这样的
frame.select("name").show()
为什么要show,show的时候就会将数据以表结构展示出来,如果不用show方法,会是什么样子呢
println(frame.select("name"))
显示的这个字段的类型,也就是刚开始printSchema方法里面的
上面展示的Select方法只是其中一个,对于笔者来说,并不认为这个方式好,我更喜欢下面的方式
frame.select($"name")
看一下结果吧
2.使用组合DSL进行分析
查找年龄大于25的姓名和下一年的年龄以及性别并且按照年龄升序排序
frame.select($"name",$"age" +1,$"gender") .where($"age" > 25)
.orderBy(frame("age").asc) .show()
看一下结果
当然啦,也可以进行分组聚合。在SQL里面是group by + sum,在DSL中,需要先groupby然后计数的时候直接使用count
val frame = sparkSession.read.json("date/people.json")
//分组聚合
frame.groupBy($"age" ).count().show()
上面的实例通过灵活使用Data frame提供的Api实现了SQL一样的操作,但是如果放在RDD编程中,对于分组聚合这个操作,需要先groupbykey 然后再map转换。
且不说RDD读取Json会转换成RDD[String],然后再转换成其它RDD类型的。
Spark SQL可以直接解析Json并且推断出结构信息(Schema)
如果你不想学习DSL,没有关系,下面就来介绍使用SQL进行查询
执行SQL查询SparkSession提供了直接读取SQL的方法,SQL语句可以直接作为字符串传入到sql方法里面,并且返回的对象就是Data frame对象。不过想要这样执行,需要先将Data frame对象注册为临时表,然后才可以进行操作
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
//读取json文件
val frame = sparkSession.read.json("date/people.json")
//注册为临时表
frame.createOrReplaceTempView("people")
//调用Spark Session的SQL接口,对临时表进行SQL查询
sparkSession.sql("select age,count(*) from people group by age").show()
看一下结果
需要像注册为临时表,才可以进行SQL查询。但是临时表有一个问题,这个Spark Session结束的时候,这个表就不能用了,因此就有了全面临时表
全面临时表全面临时表的作用范围是某个Spark 应用程序内所有会话,会持续存在,在所有会话中共享。下面演示一下
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
//读取json文件
val frame = sparkSession.read.json("date/people.json")
//注册为全局表
frame.createOrReplaceGlobalTempView("people")
//进行查询
sparkSession.sql("select name,age from people").show()
//创建一个新的会话
sparkSession.newSession().sql("select name,age from people").show()
注意,出错误了
看一下错误信息
没有找到people这个全局表。我们已经设置了,为什么会出现没有这个表呢?
引用全局表需要global_temp 进行标识。这个global_temp就相当于系统的数据库,全局表在这个数据库里面。
既然知道了错误原因,下面来看一看正确的代码和结果
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_Sql")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
//读取json文件
val frame = sparkSession.read.json("date/people.json")
//注册为全局表
frame.createOrReplaceGlobalTempView("people")
//进行查询
sparkSession.sql("select name,age from global_temp.people").show()
//新的会话
sparkSession.newSession().sql("select name,age from global_temp.people").show()
没有报错,并且还有结果
总结
明天更新的是RDDS 和 Data frame 和 Data Set之间的关系以及转换。
今天写的内容不算多,也挺少的,明天会拿出一个下午的时间来写一篇篇幅较长的文章
今天的科目四依旧还是受理中
今天也是想她的一天



