栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark-core的几个案例

spark-core的几个案例

spark-core的几个案例
  • 需求一:热门品类的top10
    • 方法一:分别得到品类的点击、下单、付款数量,再像sql操作一样使用cogroup关联起来
    • 方法二:将rdd添加缓存,且避免了cogroup
    • 方法三:
    • 方法四:采用累加器进行统计(代码量复杂但是效率高)
  • 需求二:统计页面的跳转率

需求一:热门品类的top10

Top10 热门品类:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

方法一:将各品类的数量求出来,在用cogroup关联(效率低)
方法二:将rdd缓存,union替代cogroup
方法三:将数据变成(品类,(点击数,下单数,支付数)),union后在groupByKey
方法四:方法三中的groupByKey使用累加器替代,避免shuffle

方法一:分别得到品类的点击、下单、付款数量,再像sql操作一样使用cogroup关联起来
package requirements

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object hotCategoryTop10_1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Top10")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
    // Top10 热门品类:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
    // 方法一:将各品类的数量求出来,在用cogroup关联(效率低)

    // 点击数量
    val clickRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(6) != "-1"
      }
    }).map({
      line => {
        val data = line.split("_")
        (data(6), 1)
      }
    }).reduceByKey(_ + _)

    // 下单数量
    val orderRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(8) != "null"
      }
    }).flatMap({
      line => {
        val data = line.split("_")(8)
        // 将下单数中多个品类在单独取出,组成(品类,1)格式
        data.split(",").map((_, 1))
      }
    }).reduceByKey(_ + _)

    // 付款数量
    val payRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(10) != "null"
      }
    }).flatMap({
      line => {
        val data = line.split("_")(8)
        // 将下单数中多个品类在单独取出,组成(品类,1)格式
        data.split(",").map((_, 1))
      }
    }).reduceByKey(_ + _)

    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickRDD.cogroup(orderRDD, payRDD)
    // (String, (Iterable[Int], Iterable[Int], Iterable[Int])) => (String, (Int,Int,Int))
    
    // 取出迭代器中的数据
    val resultRDD: RDD[(String, (Int, Int, Int))] = cogroupRDD.mapValues({
      case (clickIter, orderIter, payIter) => {
        var clickCnt = 0
        if (clickIter.iterator.hasNext) {
          clickCnt = clickIter.iterator.next()
        }
        var orderCnt = 0
        if (orderIter.iterator.hasNext) {
          orderCnt = orderIter.iterator.next()
        }
        var payCnt = 0
        if (payIter.iterator.hasNext) {
          payCnt = payIter.iterator.next()
        }
        (clickCnt, orderCnt, payCnt)
      }
    })
    val result: Array[(String, (Int, Int, Int))] = resultRDD.sortBy(_._2, false).take(10)
    result.foreach(println)
    sc.stop()
  }
}

方法二:将rdd添加缓存,且避免了cogroup
package requirements

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object hotCategoryTop10_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Top10")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
    // Top10 热门品类:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
    // 方法二:将rdd缓存,union替代cogroup
    rdd.cache()
    // 点击数量
    val clickRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(6) != "-1"
      }
    }).map({
      line => {
        val data = line.split("_")
        (data(6), 1)
      }
    }).reduceByKey(_ + _)
    // 下单数量
    val orderRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(8) != "null"
      }
    }).flatMap({
      line => {
        val data = line.split("_")(8)
        // 将下单数中多个品类在单独取出,组成(品类,1)格式
        data.split(",").map((_, 1))
      }
    }).reduceByKey(_ + _)
    // 付款数量
    val payRDD: RDD[(String, Int)] = rdd.filter({
      line => {
        val data = line.split("_")
        data(10) != "null"
      }
    }).flatMap({
      line => {
        val data = line.split("_")(8)
        // 将下单数中多个品类在单独取出,组成(品类,1)格式
        data.split(",").map((_, 1))
      }
    }).reduceByKey(_ + _)
    // (品类, 点击数) =》(品类, (点击数, 0, 0))
    val clictCntRDD: RDD[(String, (Int, Int, Int))] = clickRDD.map({
      case (catlog, cnt) => {
        (catlog, (cnt, 0, 0))
      }
    })
    // (品类, 点击数) =》(品类, (0, 下单数, 0))
    val orderCntRDD: RDD[(String, (Int, Int, Int))] = orderRDD.map({
      case (catlog, cnt) => {
        (catlog, (0, cnt, 0))
      }
    })
    // (品类, 点击数) =》(品类, (0, 0, 付款数))
    val payCntRDD: RDD[(String, (Int, Int, Int))] = payRDD.map({
      case (catlog, cnt) => {
        (catlog, (0, 0, cnt))
      }
    })
    val resultRDD: RDD[(String, (Int, Int, Int))] = clictCntRDD.union(orderCntRDD).union(payCntRDD)
    val result: RDD[(String, (Int, Int, Int))] = resultRDD.reduceByKey((ct1, ct2) => {
      (ct1._1 + ct2._1, ct1._2 + ct2._2, ct1._3 + ct2._3)
    })
    result.sortBy(_._2, false).take(10).foreach(println)
    sc.stop()
  }
}

方法三:
package requirements

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object hotCategoryTop10_3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Top10")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
    // Top10 热门品类:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
    // 方法三:直接将数据转换成(品类,(点击数,下单数,付款数))
    // 一步到位直接装化成结果格式
    val resultRDD: RDD[(String, (Int, Int, Int))] = rdd.flatMap({
      line => {
        val data: Array[String] = line.split("_")
        if (data(6) != "-1") {
          List((data(6), (1, 0, 0)))
        }
        else if (data(8) != "null") {
          data(8).split(",").map((_, (0, 1, 0)))
        }
        else if (data(10) != "null") {
          data(10).split(",").map((_, (0, 0, 1)))
        } else Nil
      }
    })

    val result: RDD[(String, (Int, Int, Int))] = resultRDD.reduceByKey((ct1, ct2) => {
      (ct1._1 + ct2._1, ct1._2 + ct2._2, ct1._3 + ct2._3)
    })
    result.sortBy(_._2, false).take(10).foreach(println)
    sc.stop()
  }
}

方法四:采用累加器进行统计(代码量复杂但是效率高)
package hotCategoryTop10

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object hotCategoryTop10_4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Top10")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)
    // Top10 热门品类:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
    // 方法四:直接将数据转换成(品类,(点击数,下单数,付款数))
    //        自定义累加器

    //todo 实例化累加器,注册累加器
    val acc = new MyAcc()
    sc.register(acc, "myAcc")
    //todo 将数据打标签,传入累加器
    rdd.foreach(
      action => {
        val datas: Array[String] = action.split("_")
        if (datas(6) != "-1") {
          // 点击的场合
          acc.add((datas(6), "click"))
        } else if (datas(8) != "null") {
          // 下单的场合
          val ids = datas(8).split(",")
          ids.foreach(
            id => {
              acc.add((id, "order"))
            }
          )
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split(",")
          ids.foreach(
            id => {
              acc.add((id, "pay"))
            }
          )
        }
      }
    )
    //todo 取到累加后的结果
    val result: mutable.Map[String, (Int, Int, Int)] = acc.value
    //todo 将数据进行排序
    val sortResult: List[(String, (Int, Int, Int))] = result.toList.sortWith(
      (left: (String, (Int, Int, Int)), right: (String, (Int, Int, Int))) => {
        if (left._2._1 > right._2._1) {
          true
        } else if (left._2._1 == right._2._1) {
          if (left._2._2 > right._2._2) {
            true
          } else if (left._2._2 == right._2._2) {
            left._2._3 > right._2._3
          } else {
            false
          }
        } else {
          false
        }
      }
    )
    sortResult.take(10).foreach(println)
    sc.stop()
  }
}

// 自定义累加器
class MyAcc() extends AccumulatorV2[(String, String), mutable.Map[String, (Int, Int, Int)]] {
  //todo 定义一个map用于记传递数据
  private val map: mutable.Map[String, (Int, Int, Int)] = mutable.Map[String, (Int, Int, Int)]()

  override def isZero: Boolean = {
    map.isEmpty
  }

  override def copy(): AccumulatorV2[(String, String), mutable.Map[String, (Int, Int, Int)]] = new MyAcc()

  override def reset(): Unit = map.clear()

  //todo 根据传入的数据标签进行计算,再对应的标识位置添加计数
  override def add(v: (String, String)): Unit = {
    val cid: String = v._1
    val actionType: String = v._2
    val newMap: (Int, Int, Int) = map.getOrElse(cid, (0, 0, 0))
    if (actionType == "click") {
      map.update(cid, (newMap._1 + 1, newMap._2, newMap._3))
    } else if (actionType == "order") {
      map.update(cid, (newMap._1, newMap._2 + 1, newMap._3))
    } else if (actionType == "pay") {
      map.update(cid, (newMap._1, newMap._2, newMap._3 + 1))
    }
    // 将map更新

  }

  override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, (Int, Int, Int)]]): Unit = {

    //todo 取别的map,将数据添加但当前定义map中
      other.value.foreach({   
      case (cid, (click, order, pay)) => {
        val newMap: (Int, Int, Int) = map.getOrElse(cid, (0, 0, 0))
        map.update(cid, (newMap._1 + click, newMap._2 + order, newMap._3 + pay))
      }
    })

  }

  override def value: mutable.Map[String, (Int, Int, Int)] = map
}

需求二:统计页面的跳转率
package hotCategoryTop10

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

case class UserVisitAction(
                            date: String, //用户点击行为的日期
                            user_id: Long, //用户的 ID
                            session_id: String, //Session 的 ID
                            page_id: Long, //某个页面的 ID
                            action_time: String, //动作的时间点
                            search_keyword: String, //用户搜索的关键词
                            click_category_id: Long, //某一个商品品类的 ID
                            click_product_id: Long, //某一个商品的 ID
                            order_category_ids: String, //一次订单中所有品类的 ID 集合
                            order_product_ids: String, //一次订单中所有商品的 ID 集合
                            pay_category_ids: String, //一次支付中所有品类的 ID 集合
                            pay_product_ids: String, //一次支付中所有商品的 ID 集合
                            city_id: Long)

object PageFlow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Top10")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt", 6)

    val actionRDD: RDD[UserVisitAction] = rdd.map(line => {
      val data: Array[String] = line.split("_")
      UserVisitAction(data(0), data(1).toLong, data(2), data(3).toLong, data(4), data(5), data(6).toLong, data(7).toLong, data(8), data(9), data(10), data(11), data(12).toLong
      )
    })
    actionRDD.cache()
    // 每个页面出现的次数
    // todo 1、计算分母
    val pageCnt: Map[Long, Long] = actionRDD.map(
      line => {
        (line.page_id, 1L)
      }
    ).reduceByKey(_ + _).collect().toMap
    // todo 2、计算分子:先计算不同session的页面跳转情况,再将跳转情况装化成((初始页面,目标页面),跳转次数)
    val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionRDD.groupBy(_.session_id)
    val mvPageRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(iter => {
      val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
      val flowIds: List[Long] = sortList.map(_.page_id)
      //tail 返回一个列表,包含除了第一元素之外的其他元素
      val pageFlowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
      pageFlowIds.map((_, 1))
    })
    val pageFlow: RDD[((Long, Long), Int)] = mvPageRDD.map(_._2).flatMap(list => list).reduceByKey(_ + _)
    pageFlow.foreach({
      case ((page1, page2), cnt) => {
        // 得到分母(初始页面)的出现的次数
        val page1Times: Long = pageCnt.getOrElse(page1, 0L)
        println(s"页面${page1Times}跳转到页面${page2}的转换率为${cnt.toDouble / page1Times.toDouble}%")
      }
    })
    sc.stop()
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/329511.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号