栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark学习之SparkSQL

spark学习之SparkSQL

SparkSQL SparkSQL概述

Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。

SparkSQL编程 Dataframe编程 sparkSession
Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。

在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark框架会自动的创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样
Dataframe

Spark SQL的Dataframe API 允许我们使用 Dataframe 而不用必须去注册临时表或者生成 SQL 表达式。Dataframe API 既有 transformation操作也有action操作。

创建Dataframe

在Spark SQL中SparkSession是创建Dataframe和执行SQL的入口,创建Dataframe有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

1)从Spark数据源进行创建
# 查看Spark支持创建文件的数据源格式
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

# 在spark的bin/data目录(windows环境)中创建user.json文件
{"username":"zhangsan","age":20}

# 读取json文件创建Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, username: string]
注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换

# 展示结果:show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
2)从RDD进行转换
在后续章节中讨论
3)从Hive Table进行查询返回
SQL语法

SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

1)读取JSON文件创建Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, username: string]

2)对Dataframe创建一个临时表(视图)
scala> df.createOrReplaceTempView("people")

3)通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.Dataframe = [age: bigint, name: string]

4)结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

5)对于Dataframe创建一个全局表
scala> df.createGlobalTempView("people")

6)通过SQL语句实现查询全表
scala> spark.sql("SELECt * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+

scala> spark.newSession().sql("SELECt * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|     lisi|
| 40|   wangwu|
+---+--------+
DSL语法

Dataframe提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

1)创建一个Dataframe
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.Dataframe = [age: bigint, name: string]

2)查看Dataframe的Schema信息
scala> df.printSchema
root
 |-- age: Long (nullable = true)
 |-- username: string (nullable = true)
 
3)只查看"username"列数据,
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
|     lisi|
|   wangwu|
+--------+

4)查看"username"列数据以及"age+1"数据 
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age" + 1).show
scala> df.select('username, 'age + 1).show()
scala> df.select('username, 'age + 1 as "newage").show()


+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|        21|
|     lisi|        31|
|  wangwu|         41|
+--------+---------+

5)查看"age"大于"30"的数据
scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40|    wangwu|
+---+---------+

6)按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20|    1|
| 30|    1|
| 40|    1|
+---+-----+
DataSet编程

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

创建DataSet
1)使用样例类序列创建DataSet
scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

scala> caseClassDS.show
+---------+---+
|     name|age|
+---------+---+
| zhangsan|  2|
+---------+---+
2)使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+
RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
DataSet转换为RDD

DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at :25

scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))
Dataframe和 DataSet转换

Dataframe其实是DataSet的特例,所以它们之间是可以互相转换的。

# Dataframe转换为DataSet   as[T]
scala> case class User(name:String, age:Int)
defined class User

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.Dataframe = [name: string, age: int]

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]


# DataSet转换为Dataframe	 toDF
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF
df: org.apache.spark.sql.Dataframe = [name: string, age: int]

IDEA开发SparkSQL 基本使用
package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}

object SparkSQL02 {
  def main(args: Array[String]): Unit = {
      
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
        
    //SparkSession
    //Scala中构造方法前端加上private关键字,表示构造方法私有
    //scala中构造方法分为主构造方法与辅助构造方法
    //只有主构造方法才能完成类的初始化,辅助构造方法必须显示或者间接调用主构造方法
    //SparkSession无法直接构建对象,需要通过Builder构建  
    val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()

    //简单操作
    //注意:spark读取json文件中的方式是按行读取,每一行都是json
    val frame: Dataframe = spark.read.json("data/user.json")
    //查询
    frame.show
    //将DF转为试图
    frame.createTempView("user")
    //采用sql查询
    spark.sql("select avg(age) from user").show

    //采用DSL语法
    frame.select("username").show

    //使用$引用
    // 进行操作时,必须使用隐式转换,导入对象的内容,必须使用var修饰
    import spark.implicits._
    frame.select($"username",$"age" + 1).show

    spark.stop()
  }

}

rdd,df,ds相互转换
package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}

object SparkSQL03 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
    import spark.implicits._

    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
      List(
        (1, "zhangsan", 20),
        (1, "list", 20),
        (1, "wangwu", 20)
      )
    )
    
    // RDD => Dataframe   增加结构
    // Dataframe就是一个DataSet[Row],但是类型时固定的: Row
    val df: Dataframe = rdd.toDF("id","name","age")
    
    // Dataframe => DataSet  增加类型
    val ds: Dataset[User] = df.as[User]
    
    //DataSet => Dataframe
    val frame: Dataframe = ds.toDF()
    
    //Dataframe => rdd
    val newRdd: RDD[Row] = frame.rdd
    
    // RDD => DataSet 增加结构与类型
    val newDS: Dataset[User] = rdd.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }.toDS()
    
    // DataSet => RDD
    val rdd1: RDD[User] = newDS.rdd
    
    
    spark.stop()
  }

  case class User(id:Int,name:String,age:Int)

}

用户自定义函数

用户可以通过spark.udf功能添加自定义函数,实现自定义功能。

UDF

package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}


object SparkSQL04_UDF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
    import spark.implicits._

    val frame: Dataframe = spark.read.json("data/user.json")
    frame.createTempView("user")

    //自定义函数
    //可以将udf函数理解为map功能函数
    spark.udf.register("PrefixNmae",(name:String)=>{"用户姓名: "+name})
    spark.sql("select PrefixNmae(username) from user").show
    
  }
}

用户自定义聚合函数
package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataframe, Encoder, Encoders, SparkSession, functions}


object SparkSQL05_UDAF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate()
    import spark.implicits._

    val frame: Dataframe = spark.read.json("data/user.json")
    frame.createTempView("user")

    //UDF不需要聚合数据,所以轮询的每一条数据处理完毕就结束了
    //UDAF需要聚合数据,所以轮询时,需要将数据处理的状态保存起来,保存在缓冲区

    //sql:弱类型操作,Dataframe
    //Aggregator:强类型操作 DataSet

    spark.udf.register("ageAvg",functions.udaf(new AgeAvgUDAF))
    spark.sql("select ageAvg(age) from user").show

    spark.stop()

  }

  //1.需要继承Aggregator
    // IN: 聚合函数的输入
    // BUFF: 聚合函数中用于计算的缓冲区类型
    // OUT: 聚合函数的输出
  //2.重写方法
    //逻辑(4)
    //编码(2)
  case class AvgBuff(var total:Long, var count:Long)
  class AgeAvgUDAF extends Aggregator[Long,AvgBuff,Long]{
    //返回初始化的缓冲区对象
    override def zero: AvgBuff = {
      AvgBuff(0,0)
    }

    //聚合:用当前的数据更新缓冲区
    override def reduce(buff: AvgBuff, age: Long): AvgBuff = {
      buff.total = buff.total + age
      buff.count = buff.count + 1
      buff
    }

    //合并:分布式,多个缓冲区数据合并
    override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = {
      b1.total = b1.total + b2.total
      b1.count = b1.count + b2.count
      b1
    }

    //完成,计算聚合函数的值
    override def finish(buff: AvgBuff): Long = {
      buff.total / buff.count
    }

    override def bufferEncoder: Encoder[AvgBuff] = Encoders.product

    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }

}
SparkSQL数据的加载与保存 通用的加载和保存方式

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet

加载数据

# park.read.load 是加载数据的通用方法
scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

# 如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")

- format("…"):指定加载的数据类型,包"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

# 我们前面都是使用read API 先把文件加载到 Dataframe然后再查询,其实,我们也可以直接在文件上进行查询:  文件格式.`文件路径`

scala>spark.sql("select * from json.`/opt/module/data/user.json`").show

保存数据

# df.write.save 是保存数据的通用方法
scala>df.write.
csv  jdbc   json  orc   parquet textFile… …

# 如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")

format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

保存操作可以使用 SaveMode, 用来指明如何处理数据,使用mode()方法来设置。
有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode是一个枚举类,其中的常量包括:
Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)“error”(default)如果文件已经存在则抛出异常
SaveMode.Append“append”如果文件已经存在则追加
SaveMode.Overwrite“overwrite”如果文件已经存在则覆盖
SaveMode.Ignore“ignore”如果文件已经存在则忽略
Parqut

Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。

数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

Json

Spark SQL 能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件。

注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]

1)导入隐式转换
import spark.implicits._

2)加载JSON文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path) # 可以直接展示出来

3)创建临时表
peopleDF.createOrReplaceTempView("people")

4)数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERe age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
|  name|
+------+
|Justin|
+------+
CSV

Spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")
MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建Dataframe,通过对Dataframe一系列的计算后,还可以将数据再写回关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。

读数据

spark.read.format("jdbc")
  .option("url", "jdbc:mysql://hadoop102:3306/gmall")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("user", "root")
  .option("password", "123123")
  .option("dbtable", "sku_info")
  .load().show

写数据

//ds就是dataSet,已经存在了数据
ds.write.format("jdbc")
  .option("url","jdbc:mysql://hadoop102:3306/gmall")
  .option("user":"root")
  .option("password":"123123")
  .option("dbtable":"user")
  .mode("append") // 指定模式
  .save
Hive
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

内置hive

一般用于测试

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
。。。
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> spark.sql("create table aa(id int)")

。。。

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|       aa|      false|
+--------+---------+-----------+
向表加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")

。。。

scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
在实际使用中, 几乎没有任何人会使用内置的 Hive

外置hive

如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

Ø Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下

Ø 把Mysql的驱动copy到jars/目录下

Ø 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下

Ø 重启spark-shell

scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|                 emp|      false|
| default|hive_hbase_emp_table|      false|
| default| relevance_hbase_emp|      false|
+--------+--------------------+-----------+

scala> 
IDEA连接hive

添加依赖


    org.apache.spark
    spark-hive_2.12
    3.0.0



    org.apache.hive
    hive-exec
    1.2.1


    mysql
    mysql-connector-java
    5.1.27

代码示例

package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkSQL06_HIVE {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
    val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
    import spark.implicits._

    //TODO idea开发HIVE操作,需要遵循以下几个步骤
    //添加依赖关系
    //在创建环境对象时,启用Hive知此恨
    //将hive的配置文件放置在classpath中
    spark.sql("show tables").show

    spark.stop()
  }
}

spark-sql/spark-beeline

spark-sql

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口

# 可以直接写sql了
bin/spark-sql

spark-beeline

Spark Thrift Server是Spark社区基于HiveServer2实现的一个Thrift服务。旨在无缝兼容HiveServer2。因为Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hive的beeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的也只是取代HiveServer2,因此它依旧可以和Hive metastore进行交互,获取到hive的元数据。

# 如果想连接Thrift Server,需要通过以下几个步骤:
- Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
- 把Mysql的驱动copy到jars/目录下
- 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
- 启动Thrift Server
sbin/start-thriftserver.sh

# 使用beeline连接Thrift Server
bin/beeline -u jdbc:hive2://hadoop102:10000 -n root

SparkSQL项目实战 数据准备

我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。

一共有3张表: 1张用户行为表,1张城市表,1 张产品表

CREATE TABLE `user_visit_action`(
  `date` string,	
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint,
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint)
row format delimited fields terminated by 't';
load data local inpath 'input/user_visit_action.txt' into table user_visit_action;

CREATE TABLE `product_info`(
  `product_id` bigint,
  `product_name` string,
  `extend_info` string)
row format delimited fields terminated by 't';
load data local inpath 'input/product_info.txt' into table product_info;

CREATE TABLE `city_info`(
  `city_id` bigint,
  `city_name` string,
  `area` string)
row format delimited fields terminated by 't';
load data local inpath 'input/city_info.txt' into table city_info;
// 在IDEA中创建数据,其中sprakSqlReq数据库已在hive的命令行窗口中创建完毕
package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkSQL07_Req {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面
    val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
    val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
    import spark.implicits._
    //生成模拟数据
    spark.sql("use sprakSqlReq")
    //JSON SQL一般采用多行字符串
    spark.sql(
      """
        |CREATE TABLE `user_visit_action`(
        |  `date` string,
        |  `user_id` bigint,
        |  `session_id` string,
        |  `page_id` bigint,
        |  `action_time` string,
        |  `search_keyword` string,
        |  `click_category_id` bigint,
        |  `click_product_id` bigint,
        |  `order_category_ids` string,
        |  `order_product_ids` string,
        |  `pay_category_ids` string,
        |  `pay_product_ids` string,
        |  `city_id` bigint)
        |row format delimited fields terminated by 't'
        |""".stripMargin)
    spark.sql(
      """
        |load data local inpath 'data/user_visit_action.txt' into table sprakSqlReq.user_visit_action
        |""".stripMargin);


    spark.sql(
      """
        |CREATE TABLE `product_info`(
        |  `product_id` bigint,
        |  `product_name` string,
        |  `extend_info` string)
        |row format delimited fields terminated by 't'
        |""".stripMargin)
    spark.sql(
      """
        |load data local inpath 'data/product_info.txt' into table sprakSqlReq.product_info
        |""".stripMargin);


    spark.sql(
      """
        |CREATE TABLE `city_info`(
        |  `city_id` bigint,
        |  `city_name` string,
        |  `area` string)
        |row format delimited fields terminated by 't'
        |""".stripMargin)
    spark.sql(
      """
        |load data local inpath 'data/city_info.txt' into table sprakSqlReq.city_info
        |""".stripMargin);

    spark.sql(
      """
        |select * from city_info
        |""".stripMargin).show(20) //只显示20行

  }

}

需求一,各区域热门商品 Top3

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

地区商品名称点击次数城市备注
华北商品A100000北京21.2%,天津13.2%,其他65.6%
华北商品P80200北京63.0%,太原10%,其他27.0%
华北商品M40000北京63.0%,太原10%,其他27.0%
东北商品J92000大连28%,辽宁17.0%,其他 55.0%
SELECt
	* 
FROM
	(
	SELECt
		*,
		rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank 
	FROM
		(
		SELECt
			area,
			product_name,
			count(*) AS clickCount 
		FROM
			(
			SELECt
				a.*,
				c.area,
				p.product_name 
			FROM
				user_visit_action a
				JOIN city_info c ON a.city_id = c.city_id
				JOIN product_info p ON a.click_product_id = p.product_id 
			WHERe
				a.click_product_id != - 1 
			) t1 
		GROUP BY
			area,
			product_name 
		) t2 
	) t3 
WHERe
	rank <= 3

ideal代码

package com.pihao.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object SparkSQL08_Req {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu") //放在代码最前面
    val conf = new SparkConf().setMaster("local[*]").setAppName("Hive")
    val spark: SparkSession = SparkSession .builder().enableHiveSupport().config(conf).getOrCreate()
    import spark.implicits._

    spark.sql("use sprakSqlReq")
    spark.sql(
      """
        |SELECt
        |				a.*,
        |				c.area,
        |       c.city_name,
        |				p.product_name
        |			FROM
        |				user_visit_action a
        |				JOIN city_info c ON a.city_id = c.city_id
        |				JOIN product_info p ON a.click_product_id = p.product_id
        |			WHERe
        |				a.click_product_id != -1
        |""".stripMargin).createOrReplaceTempView("t1")

    //聚合函数:同一个区域中多个城市名称 => 城市备注
    spark.udf.register("cityRemark",functions.udaf(new CityRemarkUDAF))

    spark.sql(
      """
        |SELECT
        |			area,
        |			product_name,
        |			count(*) AS clickCount,
        |     cityRemark(city_name) as cityRemark
        |		FROM
        |			 t1
        |		GROUP BY
        |			area,
        |			product_name
        |""".stripMargin).createOrReplaceTempView("t2")

//    spark.sql("select * from t2").show()
    spark.sql(
      """
        |SELECt
        |		*,
        |		rank() over ( PARTITION BY area ORDER BY clickCount DESC ) AS rank
        |	FROM
        |		 t2
        |""".stripMargin).createOrReplaceTempView("t3")

    spark.sql(
      """
        |SELECT
        |	*
        |FROM
        |	 t3
        |WHERe
        |	rank <= 3
        |""".stripMargin).show()



    spark.stop()
  }

  case class CityBuffer(var total:Long,var cityMap:mutable.Map[String,Long])

  
  class CityRemarkUDAF extends Aggregator[String,CityBuffer,String]{
    //初始化
    override def zero: CityBuffer = {
      CityBuffer(0L,mutable.Map[String,Long]())
    }

    //聚合
    override def reduce(buff: CityBuffer, cityName: String): CityBuffer = {
      buff.total += 1
      val newVal = buff.cityMap.getOrElse(cityName,0L) +1
      buff.cityMap.update(cityName,newVal)
      buff
    }

    //缓冲区的合并
    override def merge(b1: CityBuffer, b2: CityBuffer): CityBuffer = {
      b1.total += b2.total
      b2.cityMap.foreach {
        case (cityName, cnt) => {
           val newCount = b1.cityMap.getOrElse(cityName,0L) + cnt
           b1.cityMap.update(cityName,newCount)
        }
      }
      b1
    }

    //怎么生成remark
    override def finish(buff: CityBuffer): String = {
      val cityClickList: ListBuffer[String] = ListBuffer[String]()

      val cityDataList: List[(String, Long)] = buff.cityMap.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)
      val totalCnt: Long = buff.total
      val hasMoreCity =  buff.cityMap.size > 2
      var p = 100L //总的比率 100%
      cityDataList.foreach{
        case (city,cnt) => {
          var r = cnt * 100/totalCnt //单个城市的点击占比
          if (hasMoreCity){
            p = p -r
          }
          val s = city + " " + r + "%"
          cityClickList.append(s)
        }
      }
      if (hasMoreCity){
        cityClickList.append("其他 "+ p + "%")
      }
      cityClickList.mkString(",")
    }

    override def bufferEncoder: Encoder[CityBuffer] = Encoders.product

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }
}



//效果
+----+------------+----------+---------------------------+----+
|area|product_name|clickCount|                 cityRemark|rank|
+----+------------+----------+---------------------------+----+
|华南|     商品_23|       224| 厦门 29%,福州 24%,其他 47%|   1|
|华南|     商品_65|       222| 深圳 27%,厦门 26%,其他 47%|   2|
|华南|     商品_50|       212| 福州 27%,深圳 25%,其他 48%|   3|
|华北|     商品_42|       264| 郑州 25%,保定 25%,其他 50%|   1|
|华北|     商品_99|       264| 北京 24%,郑州 23%,其他 53%|   1|
|华北|     商品_19|       260| 郑州 23%,保定 20%,其他 57%|   3|
+----+------------+----------+---------------------------+----+

发现框架的最后都变成写SQL了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439169.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号