准备工作:需求:最终效果解题思路:SparkSqlOnHive的UDAF实现代码
1、pom.xml配置2、创建UDAF类2、创建TopN类3、运行结果
准备工作:--创建表
CREATE TABLE `user_visit_action`
(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint, --点击商品id,没有商品用-1表示。
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
`city_id` bigint --城市id
)
row format delimited fields terminated by 't';
CREATE TABLE `city_info`
(
`city_id` bigint, --城市id
`city_name` string, --城市名称
`area` string --区域名称
)
row format delimited fields terminated by 't';
CREATE TABLE `product_info`
(
`product_id` bigint, -- 商品id
`product_name` string, --商品名称
`extend_info` string
)
row format delimited fields terminated by 't';
-- 准备数据
--/home/lqs/module/spark-local/datas
load data local inpath '/home/lqs/module/spark-local/datas/user_visit_action.txt' into table user_visit_action;
load data local inpath '/home/lqs/module/spark-local/datas/product_info.txt' into table product_info;
load data local inpath '/home/lqs/module/spark-local/datas/city_info.txt' into table city_info;
--查询数据
select *
from user_visit_action
limit 5;
select *
from product_info
limit 5;
select *
from city_info
limit 5;
需求:最终效果
| 地区 | 商品名称 | 点击次数 | 城市备注 |
|---|---|---|---|
| 华北 | 商品A | 100000 | 北京21.2%,天津13.2%,其他65.6% |
| 华北 | 商品P | 80200 | 北京63.0%,太原10%,其他27.0% |
| 华北 | 商品M | 40000 | 北京63.0%,太原10%,其他27.0% |
| 东北 | 商品J | 92000 | 大连28%,辽宁17.0%,其他 55.0% |
--对三个数据表进行内连接
select c.area,-- 地区
c.city_name,
p.product_name,
v.click_product_id -- 点击商品id
from (
select click_product_id, city_id
from user_visit_action
where click_product_id > -1
) v
join city_info c on v.city_id = c.city_id
join product_info p on v.click_product_id = p.product_id;
--对数据进行地区、城市名分组统计
select t1.area,
t1.product_name,
count(*) click_count
from (
select c.area,-- 地区
c.city_name,
p.product_name,
v.click_product_id -- 点击商品id
from (
select click_product_id, city_id
from user_visit_action
where click_product_id > -1
) v
join city_info c on v.city_id = c.city_id
join product_info p on v.click_product_id = p.product_id
) t1
group by t1.area, t1.product_name;
--按照大区分区,按照点击次数倒序排序,给每个商品编号
select t2.area,
t2.product_name,
t2.click_count,
rank() over (partition by t2.area order by t2.click_count desc ) rk
from (
select t1.area,
t1.product_name,
count(*) click_count
from (
select c.area,-- 地区
c.city_name,
p.product_name,
v.click_product_id -- 点击商品id
from (
select click_product_id, city_id
from user_visit_action
where click_product_id > -1
) v
join city_info c on v.city_id = c.city_id
join product_info p on v.click_product_id = p.product_id
) t1
group by t1.area, t1.product_name
) t2;
--按照rk过滤数据 求出每个大区的前3名
select t3.area,
t3.product_name,
t3.click_count,
t3.rk
from (
select t2.area,
t2.product_name,
t2.click_count,
rank() over (partition by t2.area order by t2.click_count desc ) rk
from (
select t1.area,
t1.product_name,
count(*) click_count
from (
select c.area,-- 地区
c.city_name,
p.product_name,
v.click_product_id -- 点击商品id
from (
select click_product_id, city_id
from user_visit_action
where click_product_id > -1
) v
join city_info c on v.city_id = c.city_id
join product_info p on v.click_product_id = p.product_id
) t1
group by t1.area, t1.product_name
) t2
) t3
where t3.rk <= 3;
--实现城市备注,需要走UDAF
select t3.area,
t3.product_name,
t3.click_count
--t3.city_remark
from (
select t2.area,
t2.product_name,
t2.click_count,
rank() over (partition by t2.area order by t2.click_count desc ) rk
from (
select t1.area,
t1.product_name,
count(*) click_count
--cityRemark(t1.city_name) city_remark --需要走UDAF
from (
select c.area,-- 地区
c.city_name,
p.product_name,
v.click_product_id -- 点击商品id
from (
select click_product_id, city_id
from user_visit_action
where click_product_id > -1
) v
join city_info c on v.city_id = c.city_id
join product_info p on v.click_product_id = p.product_id
) t1
group by t1.area, t1.product_name
) t2
) t3
where t3.rk <= 3;
use db_spark_sql
SparkSqlOnHive的UDAF实现代码
1、pom.xml配置
2、创建UDAF类4.0.0 com.lqs SparkSqlOnHive 1.0-SNAPSHOT 8 8 org.apache.spark spark-sql_2.12 3.0.0 mysql mysql-connector-java 5.1.27 org.apache.spark spark-hive_2.12 3.0.0
package com.lqs.sparksqlonhive.casecombat
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
class CityRemarkUDAF extends Aggregator[String, Buff, String] {
override def zero: Buff = Buff(0L, mutable.Map[String, Long]())
override def reduce(b: Buff, city: String): Buff = {
//总点击次数
b.totalCount += 1
//每个城市点击次数
val newCount: Long = b.cityMap.getOrElse(city, 0L) + 1
b.cityMap.update(city, newCount)
b
}
override def merge(b1: Buff, b2: Buff): Buff = {
//合并所有城市的点击数量的总和
b1.totalCount += b2.totalCount
//合并城市Map(2个Map合并)
b2.cityMap.map {
case (city, count) => {
val newCount: Long = b1.cityMap.getOrElse(city, 0L) + count
b1.cityMap.update(city, newCount)
}
}
b1
}
override def finish(reduction: Buff): String = {
val remarkList: ListBuffer[String] = ListBuffer[String]()
//将统计的城市点击数量的集合进行排序,并取出前两名
// val cityCountList: List[(String, Long)] = reduction.cityMap.toList.sortWith(_._2 > _._2).take(2)
val cityCountList: List[(String, Long)] = reduction.cityMap.toList.sortBy(_._2)(Ordering[Long].reverse).take(2)
//定义一个百分比累加变量
var sum: Double = 0.0
//计算数前两名的百分比
cityCountList.foreach {
case (city, count) => {
val doubleResult: Double = (count * 100).toDouble / reduction.totalCount
//注意不能使用substring()来切,因为有些结果只有50.0
// println(doubleResult.toString.substring(0,6))
// println(doubleResult.toString.take(6))
remarkList.append(city + " " + doubleResult.toString.take(5) + "%")
(sum += doubleResult)
}
}
//进行健壮性判断,如果城市数大于2,才有其他的表示项
if (reduction.cityMap.size > 2) {
remarkList.append("其它 " + (100 - sum).toString.take(5) + "%")
}
remarkList.mkString("t")
}
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
case class Buff(var totalCount: Long, cityMap: mutable.Map[String, Long])
2、创建TopN类
package com.lqs.sparksqlonhive.casecombat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, functions}
object SparkSQL13_TopN {
def main(args: Array[String]): Unit = {
//TODO 1 创建SparkConf配置文件,并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkSqlCaseCombat").setMaster("local[*]")
System.setProperty("HADOOP_USER_NAME", "lqs")
//TODO 2 利用SparkConf创建sparkSession对象
//enableHiveSupport() 连接外部hive
val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
//注册自定义聚合函数
spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
spark.sql("use db_spark_sql;")
//执行是parkSQL
spark.sql(
"""
|select t3.area,
| t3.product_name,
| t3.click_count,
| t3.city_remark
|from (
| select t2.area,
| t2.product_name,
| t2.click_count,
| t2.city_remark,
| rank() over (partition by t2.area order by t2.click_count desc ) rk
| from (
| select t1.area,
| t1.product_name,
| count(*) click_count,
| cityRemark(t1.city_name) city_remark
| from (
| select c.area,-- 地区
| c.city_name,
| p.product_name,
| v.click_product_id -- 点击商品id
| from (
| select click_product_id, city_id
| from user_visit_action
| where click_product_id > -1
| ) v
| join city_info c on v.city_id = c.city_id
| join product_info p on v.click_product_id = p.product_id
| ) t1
| group by t1.area, t1.product_name
| ) t2
| ) t3
|where t3.rk <= 3;
|""".stripMargin
).show(1000, false)
//TODO 3 关闭资源
spark.stop()
}
}
3、运行结果
相关数据请点击这里进行下载。
资料来源:尚硅谷



