- 一、准备数据
- 二、需求分析
- 三、设计
- 四、编码实现
- 五、实验要求
本实战项目的数据是采集自电商的用户行为数据.
主要包含用户的 4 种行为: 搜索, 点击, 下单和支付.
数据格式如下, 不同的字段使用下划线分割开_:
数据说明:
- 数据采用_分割字段
- 每一行表示用户的一个点击行为, 所以每一行只能是四种行为中的一种.
- 如果搜索关键字是 null, 表示这次不是搜索
- 如果点击的品类 id 和产品 id 是 -1 表示这次不是点击
- 下单行为来说一次可以下单多个产品, 所以品类 id 和产品 id 都是多个, id 之间使用逗号,分割. 如果本次不是下单行为, 则他们相关数据用null来表示
- 支付行为和下单行为类似.
注意:大家可以自行按照数据格式准备数据
二、需求分析- 需求说明
品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.
- 分析思路一
分别统计每个品类点击的次数, 下单的次数和支付的次数.
缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常的耗时
- 分析思路二
三、设计最好的办法应该是遍历一次能够计算出来上述的 3 个指标.
使用累加器可以达成我们的需求.
1.遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器
•定义累加器
•当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
2.遍历完成之后就得到每个品类 id 和操作类型的数量.
3.按照点击下单支付的顺序来排序
4.取出 Top10
程序设计逻辑图和流程图如下所示
- 实现方式一
- 使用case class 用来封装用户行为的bean类
case class UserVisitAction(date: String, user_id: Long, session_id: String, page_id: Long, action_time: String, search_keyword: String, click_category_id: Long, click_product_id: Long, order_category_ids: String, order_product_ids: String, pay_category_ids: String, pay_product_ids: String, city_id: Long) case class CategoryCountInfo(categoryId: String, clickCount: Long, orderCount: Long, payCount: Long) - 定义用到的累加器:需要统计每个品类的点击量, 下单量和支付量, 所以我们在累加器中使用 Map 来存储这些数据: Map(cid, “click”-> 100, cid, “order”-> 50, ….)
import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable class MapAccumulator extends AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] { val map: mutable.Map[(String, String), Long] = mutable.Map[(String, String), Long]() override def isZero: Boolean = map.isEmpty override def copy(): AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] = { val newAcc = new MapAccumulator map.synchronized { newAcc.map ++= map } newAcc } override def reset(): Unit = map.clear override def add(v: (String, String)): Unit = { map(v) = map.getOrElseUpdate(v, 0) + 1 } // otherMap: (1, click) -> 20 this: (1, click) -> 10 thisMap: (1,2) -> 30 // otherMap: (1, order) -> 5 thisMap: (1,3) -> 5 override def merge(other: AccumulatorV2[(String, String), mutable.Map[(String, String), Long]]): Unit = { val otherMap: mutable.Map[(String, String), Long] = other.value otherMap.foreach { kv => map.put(kv._1, map.getOrElse(kv._1, 0L) + kv._2) } } override def value: mutable.Map[(String, String), Long] = map } - 将Top10的具体实现封装起来,如下:
import com.atguigu.practice.app.acc.MapAccumulator import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.collection.mutable object CategoryTop10App { def statCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]): List[CategoryCountInfo] = { // 1. 注册累加器 val acc = new MapAccumulator sc.register(acc, "CategoryActionAcc") // 2. 遍历日志 userVisitActionRDD.foreach { visitAction => { if (visitAction.click_category_id != -1) { acc.add((visitAction.click_category_id.toString, "click")) } else if (visitAction.order_category_ids != "null") { visitAction.order_category_ids.split(",").foreach { oid => acc.add((oid, "order")) } } else if (visitAction.pay_category_ids != "null") { visitAction.pay_category_ids.split(",").foreach { pid => acc.add((pid, "pay")) } } } } // 3. 遍历完成之后就得到每个每个品类 id 和操作类型的数量. 然后按照 CategoryId 进行进行分组 val actionCountByCategoryIdMap: Map[String, mutable.Map[(String, String), Long]] = acc.value.groupBy(_._1._1) // 4. 转换成 CategoryCountInfo 类型的集合, 方便后续处理 val categoryCountInfoList: List[CategoryCountInfo] = actionCountByCategoryIdMap.map { case (cid, actionMap) => CategoryCountInfo( cid, actionMap.getOrElse((cid, "click"), 0), actionMap.getOrElse((cid, "order"), 0), actionMap.getOrElse((cid, "pay"), 0) ) }.toList // 5. 按照 点击 下单 支付 的顺序降序来排序 val sortedCategoryInfoList: List[CategoryCountInfo] = categoryCountInfoList.sortBy(info => (info.clickCount, info.orderCount, info.payCount))(Ordering.Tuple3(Ordering.Long.reverse, Ordering.Long.reverse, Ordering.Long.reverse)) // 6. 截取前 10 val top10: List[CategoryCountInfo] = sortedCategoryInfoList.take(10) // 7. 返回 top10 品类 id top10 } } - 编写入口程序,如下:
import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object PracticeApp { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sc = new SparkContext(conf) // 1. 读取文件中的数据 val lineRDD: RDD[String] = sc.textFile("/Users/lzc/Desktop/user_visit_action.txt") // 2. 类型调整 val userVisitActionRDD: RDD[UserVisitAction] = lineRDD.map(line => { val splits: Array[String] = line.split("_") UserVisitAction( splits(0), splits(1).toLong, splits(2), splits(3).toLong, splits(4), splits(5), splits(6).toLong, splits(7).toLong, splits(8), splits(9), splits(10), splits(11), splits(12).toLong) }) // 调用Top10实现类完成对热门商品的统计 val categoryTop10: List[CategoryCountInfo] = CategoryTop10App.statCategoryTop10(sc, userVisitActionRDD) println(CategoryCountInfoList) // 停止sc sc.stop() } }
请各位自行完成Top10热门品类中每个品类的 Top10 活跃 Session 统计



