栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSqlOnHive项目实战--各区域热门商品Top3

SparkSqlOnHive项目实战--各区域热门商品Top3

目录

准备工作:需求:最终效果解题思路:SparkSqlOnHive的UDAF实现代码

1、pom.xml配置2、创建UDAF类2、创建TopN类3、运行结果

准备工作:
--创建表
CREATE TABLE `user_visit_action`
(
    `date`               string,
    `user_id`            bigint,
    `session_id`         string,
    `page_id`            bigint,
    `action_time`        string,
    `search_keyword`     string,
    `click_category_id`  bigint,
    `click_product_id`   bigint, --点击商品id,没有商品用-1表示。
    `order_category_ids` string,
    `order_product_ids`  string,
    `pay_category_ids`   string,
    `pay_product_ids`    string,
    `city_id`            bigint  --城市id
)
    row format delimited fields terminated by 't';


CREATE TABLE `city_info`
(
    `city_id`   bigint, --城市id
    `city_name` string, --城市名称
    `area`      string  --区域名称
)
    row format delimited fields terminated by 't';


CREATE TABLE `product_info`
(
    `product_id`   bigint, -- 商品id
    `product_name` string, --商品名称
    `extend_info`  string
)
    row format delimited fields terminated by 't';

-- 准备数据
--/home/lqs/module/spark-local/datas

load data local inpath '/home/lqs/module/spark-local/datas/user_visit_action.txt' into table user_visit_action;
load data local inpath '/home/lqs/module/spark-local/datas/product_info.txt' into table product_info;
load data local inpath '/home/lqs/module/spark-local/datas/city_info.txt' into table city_info;

--查询数据
select *
from user_visit_action
limit 5;
select *
from product_info
limit 5;
select *
from city_info
limit 5;
需求:最终效果
地区商品名称点击次数城市备注
华北商品A100000北京21.2%,天津13.2%,其他65.6%
华北商品P80200北京63.0%,太原10%,其他27.0%
华北商品M40000北京63.0%,太原10%,其他27.0%
东北商品J92000大连28%,辽宁17.0%,其他 55.0%
解题思路:
--对三个数据表进行内连接
select c.area,-- 地区
       c.city_name,
       p.product_name,
       v.click_product_id -- 点击商品id
from (
         select click_product_id, city_id
         from user_visit_action
         where click_product_id > -1
     ) v
         join city_info c on v.city_id = c.city_id
         join product_info p on v.click_product_id = p.product_id;

--对数据进行地区、城市名分组统计
select t1.area,
       t1.product_name,
       count(*) click_count
from (
         select c.area,-- 地区
                c.city_name,
                p.product_name,
                v.click_product_id -- 点击商品id
         from (
                  select click_product_id, city_id
                  from user_visit_action
                  where click_product_id > -1
              ) v
                  join city_info c on v.city_id = c.city_id
                  join product_info p on v.click_product_id = p.product_id
     ) t1
group by t1.area, t1.product_name;

--按照大区分区,按照点击次数倒序排序,给每个商品编号
select t2.area,
       t2.product_name,
       t2.click_count,
       rank() over (partition by t2.area order by t2.click_count desc ) rk
from (
         select t1.area,
                t1.product_name,
                count(*) click_count
         from (
                  select c.area,-- 地区
                         c.city_name,
                         p.product_name,
                         v.click_product_id -- 点击商品id
                  from (
                           select click_product_id, city_id
                           from user_visit_action
                           where click_product_id > -1
                       ) v
                           join city_info c on v.city_id = c.city_id
                           join product_info p on v.click_product_id = p.product_id
              ) t1
         group by t1.area, t1.product_name
     ) t2;

--按照rk过滤数据 求出每个大区的前3名
select t3.area,
       t3.product_name,
       t3.click_count,
       t3.rk
from (
         select t2.area,
                t2.product_name,
                t2.click_count,
                rank() over (partition by t2.area order by t2.click_count desc ) rk
         from (
                  select t1.area,
                         t1.product_name,
                         count(*) click_count
                  from (
                           select c.area,-- 地区
                                  c.city_name,
                                  p.product_name,
                                  v.click_product_id -- 点击商品id
                           from (
                                    select click_product_id, city_id
                                    from user_visit_action
                                    where click_product_id > -1
                                ) v
                                    join city_info c on v.city_id = c.city_id
                                    join product_info p on v.click_product_id = p.product_id
                       ) t1
                  group by t1.area, t1.product_name
              ) t2
     ) t3
where t3.rk <= 3;

--实现城市备注,需要走UDAF
select t3.area,
       t3.product_name,
       t3.click_count
       --t3.city_remark
from (
         select t2.area,
                t2.product_name,
                t2.click_count,
                rank() over (partition by t2.area order by t2.click_count desc ) rk
         from (
                  select t1.area,
                         t1.product_name,
                         count(*) click_count
                         --cityRemark(t1.city_name) city_remark --需要走UDAF
                  from (
                           select c.area,-- 地区
                                  c.city_name,
                                  p.product_name,
                                  v.click_product_id -- 点击商品id
                           from (
                                    select click_product_id, city_id
                                    from user_visit_action
                                    where click_product_id > -1
                                ) v
                                    join city_info c on v.city_id = c.city_id
                                    join product_info p on v.click_product_id = p.product_id
                       ) t1
                  group by t1.area, t1.product_name
              ) t2
     ) t3
where t3.rk <= 3;

use db_spark_sql
SparkSqlOnHive的UDAF实现代码 1、pom.xml配置


    4.0.0

    com.lqs
    SparkSqlOnHive
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.spark
            spark-sql_2.12
            3.0.0
        

        
            mysql
            mysql-connector-java
            5.1.27
        

        
            org.apache.spark
            spark-hive_2.12
            3.0.0
        
    



2、创建UDAF类
package com.lqs.sparksqlonhive.casecombat

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer




class CityRemarkUDAF extends Aggregator[String, Buff, String] {
  override def zero: Buff = Buff(0L, mutable.Map[String, Long]())

  override def reduce(b: Buff, city: String): Buff = {
    //总点击次数
    b.totalCount += 1

    //每个城市点击次数
    val newCount: Long = b.cityMap.getOrElse(city, 0L) + 1
    b.cityMap.update(city, newCount)
    b
  }

  override def merge(b1: Buff, b2: Buff): Buff = {
    //合并所有城市的点击数量的总和
    b1.totalCount += b2.totalCount

    //合并城市Map(2个Map合并)
    b2.cityMap.map {
      case (city, count) => {
        val newCount: Long = b1.cityMap.getOrElse(city, 0L) + count
        b1.cityMap.update(city, newCount)
      }
    }
    b1
  }

  override def finish(reduction: Buff): String = {
    val remarkList: ListBuffer[String] = ListBuffer[String]()

    //将统计的城市点击数量的集合进行排序,并取出前两名
    //    val cityCountList: List[(String, Long)] = reduction.cityMap.toList.sortWith(_._2 > _._2).take(2)
    val cityCountList: List[(String, Long)] = reduction.cityMap.toList.sortBy(_._2)(Ordering[Long].reverse).take(2)

    //定义一个百分比累加变量
    var sum: Double = 0.0

    //计算数前两名的百分比
    cityCountList.foreach {
      case (city, count) => {
        val doubleResult: Double = (count * 100).toDouble / reduction.totalCount
        //注意不能使用substring()来切,因为有些结果只有50.0
        //        println(doubleResult.toString.substring(0,6))
        //        println(doubleResult.toString.take(6))
        remarkList.append(city + " " + doubleResult.toString.take(5) + "%")
        (sum += doubleResult)
      }
    }

    //进行健壮性判断,如果城市数大于2,才有其他的表示项
    if (reduction.cityMap.size > 2) {
      remarkList.append("其它 " + (100 - sum).toString.take(5) + "%")
    }

    remarkList.mkString("t")

  }

  override def bufferEncoder: Encoder[Buff] = Encoders.product

  override def outputEncoder: Encoder[String] = Encoders.STRING
}


case class Buff(var totalCount: Long, cityMap: mutable.Map[String, Long])
2、创建TopN类
package com.lqs.sparksqlonhive.casecombat

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, functions}


object SparkSQL13_TopN {
  def main(args: Array[String]): Unit = {
    //TODO 1 创建SparkConf配置文件,并设置App名称
    val conf: SparkConf = new SparkConf().setAppName("SparkSqlCaseCombat").setMaster("local[*]")

    System.setProperty("HADOOP_USER_NAME", "lqs")

    //TODO 2 利用SparkConf创建sparkSession对象
    //enableHiveSupport() 连接外部hive
    val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

    //注册自定义聚合函数
    spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))

    spark.sql("use db_spark_sql;")

    //执行是parkSQL
    spark.sql(
      """
        |select t3.area,
        |       t3.product_name,
        |       t3.click_count,
        |       t3.city_remark
        |from (
        |         select t2.area,
        |                t2.product_name,
        |                t2.click_count,
        |                t2.city_remark,
        |                rank() over (partition by t2.area order by t2.click_count desc ) rk
        |         from (
        |                  select t1.area,
        |                         t1.product_name,
        |                         count(*) click_count,
        |                         cityRemark(t1.city_name) city_remark
        |                  from (
        |                           select c.area,-- 地区
        |                                  c.city_name,
        |                                  p.product_name,
        |                                  v.click_product_id -- 点击商品id
        |                           from (
        |                                    select click_product_id, city_id
        |                                    from user_visit_action
        |                                    where click_product_id > -1
        |                                ) v
        |                                    join city_info c on v.city_id = c.city_id
        |                                    join product_info p on v.click_product_id = p.product_id
        |                       ) t1
        |                  group by t1.area, t1.product_name
        |              ) t2
        |     ) t3
        |where t3.rk <= 3;
        |""".stripMargin
    ).show(1000, false)

    //TODO 3 关闭资源
    spark.stop()
  }
}
3、运行结果


相关数据请点击这里进行下载。
资料来源:尚硅谷

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742552.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号