SparkSQL编程 Dataframe编程 sparkSessionSpark SQL是Spark用于结构化数据(structured data)处理的Spark模块。
Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。 在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。 SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark框架会自动的创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样Dataframe
Spark SQL的Dataframe API 允许我们使用 Dataframe 而不用必须去注册临时表或者生成 SQL 表达式。Dataframe API 既有 transformation操作也有action操作。
创建Dataframe
在Spark SQL中SparkSession是创建Dataframe和执行SQL的入口,创建Dataframe有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
1)从Spark数据源进行创建
# 查看Spark支持创建文件的数据源格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
# 在spark的bin/data目录(windows环境)中创建user.json文件
{"username":"zhangsan","age":20}
# 读取json文件创建Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, username: string]
注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换
# 展示结果:show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
2)从RDD进行转换
在后续章节中讨论
3)从Hive Table进行查询返回
SQL语法
SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助
1)读取JSON文件创建Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, username: string]
2)对Dataframe创建一个临时表(视图)
scala> df.createOrReplaceTempView("people")
3)通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.Dataframe = [age: bigint, name: string]
4)结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
5)对于Dataframe创建一个全局表
scala> df.createGlobalTempView("people")
6)通过SQL语句实现查询全表
scala> spark.sql("SELECt * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
scala> spark.newSession().sql("SELECt * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
DSL语法
Dataframe提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
1)创建一个Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, name: string]
2)查看Dataframe的Schema信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- username: string (nullable = true)
3)只查看"username"列数据,
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+
4)查看"username"列数据以及"age+1"数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age" + 1).show
scala> df.select('username, 'age + 1).show()
scala> df.select('username, 'age + 1 as "newage").show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
| lisi| 31|
| wangwu| 41|
+--------+---------+
5)查看"age"大于"30"的数据
scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40| wangwu|
+---+---------+
6)按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20| 1|
| 30| 1|
| 40| 1|
+---+-----+
DataSet编程
创建DataSetDataSet是具有强类型的数据集合,需要提供对应的类型信息。
1)使用样例类序列创建DataSet
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
| zhangsan| 2|
+---------+---+
2)使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
DataSet转换为RDD
DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD
scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at :25
scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))
Dataframe和 DataSet转换
Dataframe其实是DataSet的特例,所以它们之间是可以互相转换的。
# Dataframe转换为DataSet as[T]
scala> case class User(name:String, age:Int)
defined class User
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.Dataframe = [name: string, age: int]
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
# DataSet转换为Dataframe toDF
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> val df = ds.toDF
df: org.apache.spark.sql.Dataframe = [name: string, age: int]
IDEA开发SparkSQL
基本使用
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
object SparkSQL02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//SparkSession
//Scala中构造方法前端加上private关键字,表示构造方法私有
//scala中构造方法分为主构造方法与辅助构造方法
//只有主构造方法才能完成类的初始化,辅助构造方法必须显示或者间接调用主构造方法
//SparkSession无法直接构建对象,需要通过Builder构建
val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
//简单操作
//注意:spark读取json文件中的方式是按行读取,每一行都是json
val frame: Dataframe = spark.read.json("data/user.json")
//查询
frame.show
//将DF转为试图
frame.createTempView("user")
//采用sql查询
spark.sql("select avg(age) from user").show
//采用DSL语法
frame.select("username").show
//使用$引用
// 进行操作时,必须使用隐式转换,导入对象的内容,必须使用var修饰
import spark.implicits._
frame.select($"username",$"age" + 1).show
spark.stop()
}
}
rdd,df,ds相互转换
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}
object SparkSQL03 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
List(
(1, "zhangsan", 20),
(1, "list", 20),
(1, "wangwu", 20)
)
)
// RDD => Dataframe 增加结构
// Dataframe就是一个DataSet[Row],但是类型时固定的: Row
val df: Dataframe = rdd.toDF("id","name","age")
// Dataframe => DataSet 增加类型
val ds: Dataset[User] = df.as[User]
//DataSet => Dataframe
val frame: Dataframe = ds.toDF()
//Dataframe => rdd
val newRdd: RDD[Row] = frame.rdd
// RDD => DataSet 增加结构与类型
val newDS: Dataset[User] = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}.toDS()
// DataSet => RDD
val rdd1: RDD[User] = newDS.rdd
spark.stop()
}
case class User(id:Int,name:String,age:Int)
}
用户自定义函数
用户可以通过spark.udf功能添加自定义函数,实现自定义功能。
UDF
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
object SparkSQL04_UDF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
import spark.implicits._
val frame: Dataframe = spark.read.json("data/user.json")
frame.createTempView("user")
//自定义函数
//可以将udf函数理解为map功能函数
spark.udf.register("PrefixNmae",(name:String)=>{"用户姓名: "+name})
spark.sql("select PrefixNmae(username) from user").show
}
}
用户自定义聚合函数
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataframe, Encoder, Encoders, SparkSession, functions}
object SparkSQL05_UDAF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
import spark.implicits._
val frame: Dataframe = spark.read.json("data/user.json")
frame.createTempView("user")
//UDF不需要聚合数据,所以轮询的每一条数据处理完毕就结束了
//UDAF需要聚合数据,所以轮询时,需要将数据处理的状态保存起来,保存在缓冲区
//sql:弱类型操作,Dataframe
//Aggregator:强类型操作 DataSet
spark.udf.register("ageAvg",functions.udaf(new AgeAvgUDAF))
spark.sql("select ageAvg(age) from user").show
spark.stop()
}
//1.需要继承Aggregator
// IN: 聚合函数的输入
// BUFF: 聚合函数中用于计算的缓冲区类型
// OUT: 聚合函数的输出
//2.重写方法
//逻辑(4)
//编码(2)
case class AvgBuff(var total:Long, var count:Long)
class AgeAvgUDAF extends Aggregator[Long,AvgBuff,Long]{
//返回初始化的缓冲区对象
override def zero: AvgBuff = {
AvgBuff(0,0)
}
//聚合:用当前的数据更新缓冲区
override def reduce(buff: AvgBuff, age: Long): AvgBuff = {
buff.total = buff.total + age
buff.count = buff.count + 1
buff
}
//合并:分布式,多个缓冲区数据合并
override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = {
b1.total = b1.total + b2.total
b1.count = b1.count + b2.count
b1
}
//完成,计算聚合函数的值
override def finish(buff: AvgBuff): Long = {
buff.total / buff.count
}
override def bufferEncoder: Encoder[AvgBuff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
SparkSQL数据的加载与保存
通用的加载和保存方式
SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet
加载数据
# park.read.load 是加载数据的通用方法
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
# 如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
- format("…"):指定加载的数据类型,包"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
# 我们前面都是使用read API 先把文件加载到 Dataframe然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.`文件路径`
scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
保存数据
# df.write.save 是保存数据的通用方法
scala>df.write.
csv jdbc json orc parquet textFile… …
# 如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")
format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
保存操作可以使用 SaveMode, 用来指明如何处理数据,使用mode()方法来设置。
有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode是一个枚举类,其中的常量包括:
| Scala/Java | Any Language | Meaning |
|---|---|---|
| SaveMode.ErrorIfExists(default) | “error”(default) | 如果文件已经存在则抛出异常 |
| SaveMode.Append | “append” | 如果文件已经存在则追加 |
| SaveMode.Overwrite | “overwrite” | 如果文件已经存在则覆盖 |
| SaveMode.Ignore | “ignore” | 如果文件已经存在则忽略 |
JsonSpark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。
数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。
Spark SQL 能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件。
注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]
1)导入隐式转换
import spark.implicits._
2)加载JSON文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path) # 可以直接展示出来
3)创建临时表
peopleDF.createOrReplaceTempView("people")
4)数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERe age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
CSV
Spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")
MySQL
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建Dataframe,通过对Dataframe一系列的计算后,还可以将数据再写回关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。
读数据
spark.read.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/gmall")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123123")
.option("dbtable", "sku_info")
.load().show
写数据
//ds就是dataSet,已经存在了数据
ds.write.format("jdbc")
.option("url","jdbc:mysql://hadoop102:3306/gmall")
.option("user":"root")
.option("password":"123123")
.option("dbtable":"user")
.mode("append") // 指定模式
.save
Hive
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。 若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。 spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
内置hive
一般用于测试
如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
。。。
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table aa(id int)")
。。。
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+
向表加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
。。。
scala> spark.sql("select * from aa").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
在实际使用中, 几乎没有任何人会使用内置的 Hive
外置hive
如果想连接外部已经部署好的Hive,需要通过以下几个步骤:
Ø Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
Ø 把Mysql的驱动copy到jars/目录下
Ø 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
Ø 重启spark-shell
scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
+--------+--------------------+-----------+
scala>
IDEA连接hive
添加依赖
org.apache.spark spark-hive_2.12 3.0.0 org.apache.hive hive-exec 1.2.1 mysql mysql-connector-java 5.1.27
代码示例
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkSQL06_HIVE {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
import spark.implicits._
//TODO idea开发HIVE操作,需要遵循以下几个步骤
//添加依赖关系
//在创建环境对象时,启用Hive知此恨
//将hive的配置文件放置在classpath中
spark.sql("show tables").show
spark.stop()
}
}
spark-sql/spark-beeline
spark-sql
Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口
# 可以直接写sql了 bin/spark-sql
spark-beeline
Spark Thrift Server是Spark社区基于HiveServer2实现的一个Thrift服务。旨在无缝兼容HiveServer2。因为Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hive的beeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的也只是取代HiveServer2,因此它依旧可以和Hive metastore进行交互,获取到hive的元数据。
# 如果想连接Thrift Server,需要通过以下几个步骤: - Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下 - 把Mysql的驱动copy到jars/目录下 - 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下 - 启动Thrift Server sbin/start-thriftserver.sh # 使用beeline连接Thrift Server bin/beeline -u jdbc:hive2://hadoop102:10000 -n rootSparkSQL项目实战 数据准备
我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。
一共有3张表: 1张用户行为表,1张城市表,1 张产品表
CREATE TABLE `user_visit_action`( `date` string, `user_id` bigint, `session_id` string, `page_id` bigint, `action_time` string, `search_keyword` string, `click_category_id` bigint, `click_product_id` bigint, `order_category_ids` string, `order_product_ids` string, `pay_category_ids` string, `pay_product_ids` string, `city_id` bigint) row format delimited fields terminated by 't'; load data local inpath 'input/user_visit_action.txt' into table user_visit_action; CREATE TABLE `product_info`( `product_id` bigint, `product_name` string, `extend_info` string) row format delimited fields terminated by 't'; load data local inpath 'input/product_info.txt' into table product_info; CREATE TABLE `city_info`( `city_id` bigint, `city_name` string, `area` string) row format delimited fields terminated by 't'; load data local inpath 'input/city_info.txt' into table city_info;
// 在IDEA中创建数据,其中sprakSqlReq数据库已在hive的命令行窗口中创建完毕
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkSQL07_Req {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面
val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
import spark.implicits._
//生成模拟数据
spark.sql("use sprakSqlReq")
//JSON SQL一般采用多行字符串
spark.sql(
"""
|CREATE TABLE `user_visit_action`(
| `date` string,
| `user_id` bigint,
| `session_id` string,
| `page_id` bigint,
| `action_time` string,
| `search_keyword` string,
| `click_category_id` bigint,
| `click_product_id` bigint,
| `order_category_ids` string,
| `order_product_ids` string,
| `pay_category_ids` string,
| `pay_product_ids` string,
| `city_id` bigint)
|row format delimited fields terminated by 't'
|""".stripMargin)
spark.sql(
"""
|load data local inpath 'data/user_visit_action.txt' into table sprakSqlReq.user_visit_action
|""".stripMargin);
spark.sql(
"""
|CREATE TABLE `product_info`(
| `product_id` bigint,
| `product_name` string,
| `extend_info` string)
|row format delimited fields terminated by 't'
|""".stripMargin)
spark.sql(
"""
|load data local inpath 'data/product_info.txt' into table sprakSqlReq.product_info
|""".stripMargin);
spark.sql(
"""
|CREATE TABLE `city_info`(
| `city_id` bigint,
| `city_name` string,
| `area` string)
|row format delimited fields terminated by 't'
|""".stripMargin)
spark.sql(
"""
|load data local inpath 'data/city_info.txt' into table sprakSqlReq.city_info
|""".stripMargin);
spark.sql(
"""
|select * from city_info
|""".stripMargin).show(20) //只显示20行
}
}
需求一,各区域热门商品 Top3
这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。
| 地区 | 商品名称 | 点击次数 | 城市备注 |
|---|---|---|---|
| 华北 | 商品A | 100000 | 北京21.2%,天津13.2%,其他65.6% |
| 华北 | 商品P | 80200 | 北京63.0%,太原10%,其他27.0% |
| 华北 | 商品M | 40000 | 北京63.0%,太原10%,其他27.0% |
| 东北 | 商品J | 92000 | 大连28%,辽宁17.0%,其他 55.0% |
SELECt * FROM ( SELECt *, rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank FROM ( SELECt area, product_name, count(*) AS clickCount FROM ( SELECt a.*, c.area, p.product_name FROM user_visit_action a JOIN city_info c ON a.city_id = c.city_id JOIN product_info p ON a.click_product_id = p.product_id WHERe a.click_product_id != - 1 ) t1 GROUP BY area, product_name ) t2 ) t3 WHERe rank <= 3
ideal代码
package com.pihao.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object SparkSQL08_Req {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面
val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
import spark.implicits._
spark.sql("use sprakSqlReq")
spark.sql(
"""
|SELECt
| a.*,
| c.area,
| c.city_name,
| p.product_name
| FROM
| user_visit_action a
| JOIN city_info c ON a.city_id = c.city_id
| JOIN product_info p ON a.click_product_id = p.product_id
| WHERe
| a.click_product_id != -1
|""".stripMargin).createOrReplaceTempView("t1")
//聚合函数:同一个区域中多个城市名称 => 城市备注
spark.udf.register("cityRemark",functions.udaf(new CityRemarkUDAF))
spark.sql(
"""
|SELECT
| area,
| product_name,
| count(*) AS clickCount,
| cityRemark(city_name) as cityRemark
| FROM
| t1
| GROUP BY
| area,
| product_name
|""".stripMargin).createOrReplaceTempView("t2")
// spark.sql("select * from t2").show()
spark.sql(
"""
|SELECt
| *,
| rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank
| FROM
| t2
|""".stripMargin).createOrReplaceTempView("t3")
spark.sql(
"""
|SELECT
| *
|FROM
| t3
|WHERe
| rank <= 3
|""".stripMargin).show()
spark.stop()
}
case class CityBuffer(var total:Long,var cityMap:mutable.Map[String,Long])
class CityRemarkUDAF extends Aggregator[String,CityBuffer,String]{
//初始化
override def zero: CityBuffer = {
CityBuffer(0L,mutable.Map[String,Long]())
}
//聚合
override def reduce(buff: CityBuffer, cityName: String): CityBuffer = {
buff.total += 1
val newVal = buff.cityMap.getOrElse(cityName,0L) +1
buff.cityMap.update(cityName,newVal)
buff
}
//缓冲区的合并
override def merge(b1: CityBuffer, b2: CityBuffer): CityBuffer = {
b1.total += b2.total
b2.cityMap.foreach {
case (cityName, cnt) => {
val newCount = b1.cityMap.getOrElse(cityName,0L) + cnt
b1.cityMap.update(cityName,newCount)
}
}
b1
}
//怎么生成remark
override def finish(buff: CityBuffer): String = {
val cityClickList: ListBuffer[String] = ListBuffer[String]()
val cityDataList: List[(String, Long)] = buff.cityMap.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)
val totalCnt: Long = buff.total
val hasMoreCity = buff.cityMap.size > 2
var p = 100L //总的比率 100%
cityDataList.foreach{
case (city,cnt) => {
var r = cnt * 100/totalCnt //单个城市的点击占比
if (hasMoreCity){
p = p -r
}
val s = city + " " + r + "%"
cityClickList.append(s)
}
}
if (hasMoreCity){
cityClickList.append("其他 "+ p + "%")
}
cityClickList.mkString(",")
}
override def bufferEncoder: Encoder[CityBuffer] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}
//效果
+----+------------+----------+---------------------------+----+
|area|product_name|clickCount| cityRemark|rank|
+----+------------+----------+---------------------------+----+
|华南| 商品_23| 224| 厦门 29%,福州 24%,其他 47%| 1|
|华南| 商品_65| 222| 深圳 27%,厦门 26%,其他 47%| 2|
|华南| 商品_50| 212| 福州 27%,深圳 25%,其他 48%| 3|
|华北| 商品_42| 264| 郑州 25%,保定 25%,其他 50%| 1|
|华北| 商品_99| 264| 北京 24%,郑州 23%,其他 53%| 1|
|华北| 商品_19| 260| 郑州 23%,保定 20%,其他 57%| 3|
+----+------------+----------+---------------------------+----+
发现框架的最后都变成写SQL了



