栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark 2.4.8 Top10热门商品小项目实战

Spark 2.4.8 Top10热门商品小项目实战

Spark 2.4.8 Top10小项目实战
    • 一、准备数据
    • 二、需求分析
    • 三、设计
    • 四、编码实现
    • 五、实验要求

一、准备数据

本实战项目的数据是采集自电商的用户行为数据.
主要包含用户的 4 种行为: 搜索, 点击, 下单和支付.
数据格式如下, 不同的字段使用下划线分割开_:

数据说明:

  1. 数据采用_分割字段
  2. 每一行表示用户的一个点击行为, 所以每一行只能是四种行为中的一种.
  3. 如果搜索关键字是 null, 表示这次不是搜索
  4. 如果点击的品类 id 和产品 id 是 -1 表示这次不是点击
  5. 下单行为来说一次可以下单多个产品, 所以品类 id 和产品 id 都是多个, id 之间使用逗号,分割. 如果本次不是下单行为, 则他们相关数据用null来表示
  6. 支付行为和下单行为类似.

注意:大家可以自行按照数据格式准备数据

二、需求分析
  • 需求说明

品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.

  • 分析思路一

分别统计每个品类点击的次数, 下单的次数和支付的次数.
缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常的耗时

  • 分析思路二

最好的办法应该是遍历一次能够计算出来上述的 3 个指标.
使用累加器可以达成我们的需求.
1.遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器
•定义累加器
•当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
2.遍历完成之后就得到每个品类 id 和操作类型的数量.
3.按照点击下单支付的顺序来排序
4.取出 Top10

三、设计

程序设计逻辑图和流程图如下所示

四、编码实现
  • 实现方式一
  1. 使用case class 用来封装用户行为的bean类
    	
    case class UserVisitAction(date: String,
                               user_id: Long,
                               session_id: String,
                               page_id: Long,
                               action_time: String,
                               search_keyword: String,
                               click_category_id: Long,
                               click_product_id: Long,
                               order_category_ids: String,
                               order_product_ids: String,
                               pay_category_ids: String,
                               pay_product_ids: String,
                               city_id: Long)
    case class CategoryCountInfo(categoryId: String,
                                 clickCount: Long,
                                 orderCount: Long,
                                 payCount: Long)
    
  2. 定义用到的累加器:需要统计每个品类的点击量, 下单量和支付量, 所以我们在累加器中使用 Map 来存储这些数据: Map(cid, “click”-> 100, cid, “order”-> 50, ….)
    import org.apache.spark.util.AccumulatorV2
    import scala.collection.mutable
    
    class MapAccumulator extends AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] {
        val map: mutable.Map[(String, String), Long] = mutable.Map[(String, String), Long]()
        
        override def isZero: Boolean = map.isEmpty
        
        override def copy(): AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] = {
            val newAcc = new MapAccumulator
            map.synchronized {
                newAcc.map ++= map
            }
            newAcc
        }
        override def reset(): Unit = map.clear
        override def add(v: (String, String)): Unit = {
            map(v) = map.getOrElseUpdate(v, 0) + 1
        }
        // otherMap: (1, click) -> 20 this: (1, click) -> 10 thisMap: (1,2) -> 30
        // otherMap: (1, order) -> 5 thisMap: (1,3) -> 5
        override def merge(other: AccumulatorV2[(String, String), mutable.Map[(String, String), Long]]): Unit = {
            val otherMap: mutable.Map[(String, String), Long] = other.value
            otherMap.foreach {
                kv => map.put(kv._1, map.getOrElse(kv._1, 0L) + kv._2)
            }
        }
        override def value: mutable.Map[(String, String), Long] = map
    }
    
  3. 将Top10的具体实现封装起来,如下:
    import com.atguigu.practice.app.acc.MapAccumulator
    import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    
    import scala.collection.mutable
    
    object CategoryTop10App {
        def statCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]): List[CategoryCountInfo] = {
            // 1. 注册累加器
            val acc = new MapAccumulator
            sc.register(acc, "CategoryActionAcc")
            
            // 2. 遍历日志
            userVisitActionRDD.foreach {
                visitAction => {
                    if (visitAction.click_category_id != -1) {
                        acc.add((visitAction.click_category_id.toString, "click"))
                    } else if (visitAction.order_category_ids != "null") {
                        visitAction.order_category_ids.split(",").foreach {
                            oid => acc.add((oid, "order"))
                        }
                    } else if (visitAction.pay_category_ids != "null") {
                        visitAction.pay_category_ids.split(",").foreach {
                            pid => acc.add((pid, "pay"))
                        }
                    }
                }
            }
            
            // 3. 遍历完成之后就得到每个每个品类 id 和操作类型的数量. 然后按照 CategoryId 进行进行分组
            val actionCountByCategoryIdMap: Map[String, mutable.Map[(String, String), Long]] = acc.value.groupBy(_._1._1)
            
            // 4. 转换成 CategoryCountInfo 类型的集合, 方便后续处理
            val categoryCountInfoList: List[CategoryCountInfo] = actionCountByCategoryIdMap.map {
                case (cid, actionMap) => CategoryCountInfo(
                    cid,
                    actionMap.getOrElse((cid, "click"), 0),
                    actionMap.getOrElse((cid, "order"), 0),
                    actionMap.getOrElse((cid, "pay"), 0)
                )
            }.toList
            
            // 5. 按照 点击 下单 支付 的顺序降序来排序
            val sortedCategoryInfoList: List[CategoryCountInfo] = categoryCountInfoList.sortBy(info => (info.clickCount, info.orderCount, info.payCount))(Ordering.Tuple3(Ordering.Long.reverse, Ordering.Long.reverse, Ordering.Long.reverse))
            
            // 6. 截取前 10
            val top10: List[CategoryCountInfo] = sortedCategoryInfoList.take(10)
            // 7. 返回 top10 品类 id
            top10
        }
    }
    
  4. 编写入口程序,如下:
    import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object PracticeApp {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("Practice").setMaster("local[2]")
            val sc = new SparkContext(conf)
            // 1. 读取文件中的数据
            val lineRDD: RDD[String] = sc.textFile("/Users/lzc/Desktop/user_visit_action.txt")
            // 2. 类型调整
            val userVisitActionRDD: RDD[UserVisitAction] = lineRDD.map(line => {
                val splits: Array[String] = line.split("_")
                UserVisitAction(
                    splits(0),
                    splits(1).toLong,
                    splits(2),
                    splits(3).toLong,
                    splits(4),
                    splits(5),
                    splits(6).toLong,
                    splits(7).toLong,
                    splits(8),
                    splits(9),
                    splits(10),
                    splits(11),
                    splits(12).toLong)
            })
            
            // 调用Top10实现类完成对热门商品的统计
            val categoryTop10: List[CategoryCountInfo] = CategoryTop10App.statCategoryTop10(sc, userVisitActionRDD)
            println(CategoryCountInfoList)
            
            // 停止sc
            sc.stop()
        }
    }
    
五、实验要求

请各位自行完成Top10热门品类中每个品类的 Top10 活跃 Session 统计

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311449.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号