* 编号 字段名称 字段类型 字段含义 * 1 date String 用户点击行为的日期 * 2 user_id Long 用户的ID * 3 session_id String Session的ID * 4 page_id Long 某个页面的ID * 5 action_time String 动作的时间点 * 6 search_keyword String 用户搜索的关键词 * 7 click_category_id Long 某一个商品品类的ID * 8 click_product_id Long 某一个商品的ID * 9 order_category_ids String 一次订单中所有品类的ID集合 * 10 order_product_ids String 一次订单中所有商品的ID集合 * 11 pay_category_ids String 一次支付中所有品类的ID集合 * 12 pay_product_ids String 一次支付中所有商品的ID集合 * 13 city_id Long 城市 id需求一
统计top10热门商品品类,
先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
package com.pihao.spark.requirement
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.immutable.StringOps
import scala.collection.mutable.ArrayOps
object Test1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("CategoryTop10")
val sc = new SparkContext(conf)
// 1.获取原始数据文件
val fileDatas: RDD[String] = sc.textFile("data/user_visit_action.txt")
// 2.统计品类的点击量
//2.1统计前需要将不需要的数据过滤掉,保留是点击的数据
val clickDatas: RDD[String] = fileDatas.filter(
line => {
val datas: Array[String] = line.split("_")
val cid: String = datas(6)
cid != "-1" //click_category_id不为-1则表示点击的
}
)
//2.2获取所有品类的点击量,(cid,clickSum)
val clickCntDatas: RDD[(String, Int)] = clickDatas.map(
clickData => {
val datas: Array[String] = clickData.split("_")
(datas(6), 1) //将数据组装为(cid,1)的格式
}
).reduceByKey(_ + _)
// 3.统计品类的下单数
//3.1统计前需要将不需要的数据过滤掉,保留是下单的数据
val orderDatas: RDD[String] = fileDatas.filter(
line => {
val datas: Array[String] = line.split("_")
val cid: String = datas(8)
cid != "null" //order_category_ids不为null则表示下单的
}
)
//3.2获取所有品类的下单量
//目前orderDatas的数据格式为((cid1,cid2,cid3),1)
val orderCntDatas: RDD[(String, Int)] = orderDatas.flatMap(
orderData => {
val datas: Array[String] = orderData.split("_")
val cid: StringOps = datas(8)
val cids: ArrayOps.ofRef[String] = cid.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)
//4.统计品类的支付数
//4.1统计前需要将不需要的数据过滤掉,保留是支付的数据
val payDatas: RDD[String] = fileDatas.filter(
line => {
val datas: Array[String] = line.split("_")
val cid: String = datas(10)
cid != "null" //order_category_ids不为null则表示下单的
}
)
//4.2获取所有品类的支付数
val payCntDatas: RDD[(String, Int)] = payDatas.flatMap(
payData => {
val datas: ArrayOps.ofRef[String] = payData.split("_")
val cid: StringOps = datas(10)
val cids: ArrayOps.ofRef[String] = cid.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)
//ok,现在各个品类的点击数,下单数,支付数都统计出来了?那么怎么将他们进行排序呢
//规则:先根据点击数排序,点击数相同,在根据下单数,下单数相同在根据支付数
//这里需要讲一种数据类型:元组,(点击数,下单数,支付数),它的排序就是这样这个的顺序
//那么就需要将clickCntDatas,orderCntDatas,payCntDatas转成一个元组,使用cogroup
//数据格式是 (cid,(点击数,下单数,支付数))
val cidCntDatas: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
clickCntDatas.cogroup(orderCntDatas, payCntDatas)
//获取所有组装好的的数据(cid,(sum1,sum2,sum3))
val mapDatas: RDD[(String, (Int, Int, Int))] = cidCntDatas.map {
case (key, (clickIter, orderIter, payIter)) =>
(key, (clickIter.iterator.next(), orderIter.iterator.next(), payIter.iterator.next()))
}
//排序获取top10
val top10: Array[(String, (Int, Int, Int))] = mapDatas.sortBy(_._2,false).take(10)
//将top10打印出来
top10.foreach(println)
sc.stop()
}
}
第二种方式
第一种方式缺点
//1.同一个RDD重复使用。解决(加缓存) fileDatas.persist(StorageLevel.MEMORY_AND_DISK) //cogroup算子效率地下,可能存在笛卡尔乘积,可能shuffle 换一种思路,使用reduceByKey
package com.pihao.spark.requirement
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Test2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
val sc = new SparkContext(conf)
// TODO 需求一 : Top10热门品类
// TODO (找茬) :
// 1. 同一个RDD的重复使用
// 2. cogroup算子可能性能底下
// TODO 读取文件,获取原始数据
val fileDatas = sc.textFile("data/user_visit_action.txt")
fileDatas.persist(StorageLevel.MEMORY_AND_DISK) //加上缓存优化性能
// TODO 统计品类的点击数量
// 统计分区前需要将不需要的数据过滤掉
// 先保留所有的点击数据
val clickDatas = fileDatas.filter(
data => {
val datas = data.split("_")
val cid = datas(6)
cid != "-1"
}
)
// 对点击数据进行统计
val clickCntDatas = clickDatas.map(
data => {
val datas = data.split("_")
val cid = datas(6)
(cid, 1)
}
).reduceByKey(_+_)
// TODO 统计品类的下单数量
// 先保留所有的下单数据
val orderDatas = fileDatas.filter(
data => {
val datas = data.split("_")
val cid = datas(8)
cid != "null"
}
)
// 对下单数据进行统计
// (1,2,3,4) => ((1,2,3,4), 1)
// 1, 2, 3, 4
// (1,1),(2,1),(3,1),(4,1)
val orderCntDatas = orderDatas.flatMap(
data => {
val datas = data.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map((_, 1))
}
).reduceByKey(_+_)
// TODO 统计品类的支付数量
// 先保留所有的支付数据
val payDatas = fileDatas.filter(
data => {
val datas = data.split("_")
val cid = datas(10)
cid != "null"
}
)
// 对下单数据进行统计
val payCntDatas = payDatas.flatMap(
data => {
val datas = data.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map((_, 1))
}
).reduceByKey(_+_)
// 从这开始
// TODO 对统计结果进行排序 => Tuple(点击,下单,支付)
// (品类ID, 点击) => ( 品类ID, ( 点击,0,0 ) )
// (品类ID, 下单) => ( 品类ID, ( 0,下单,0 ) )
// (品类ID, 支付) => ( 品类ID, ( 0,0,支付 ))
// (品类ID, ( 点击,下单,支付 ))
// 3 => 1 => (聚合)
// reduceByKey
// (( 点击,下单,支付 ), ( 点击,下单,支付 )) => ( 点击,下单,支付 )
val clickMapDatas = clickCntDatas.map {
case ( cid, cnt ) => {
( cid, (cnt, 0, 0) )
}
}
val orderMapDatas = orderCntDatas.map {
case ( cid, cnt ) => {
( cid, (0, cnt, 0) )
}
}
val payMapDatas = payCntDatas.map {
case ( cid, cnt ) => {
( cid, (0, 0, cnt) )
}
}
val unionRDD: RDD[(String, (Int, Int, Int))] = clickMapDatas.union(orderMapDatas).union(payMapDatas)
val reduceRDD = unionRDD.reduceByKey(
(t1, t2) => {
( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
val top10 = reduceRDD.sortBy(_._2, false).take(10)
// TODO 将结果采集后打印再控制台上
top10.foreach(println)
sc.stop()
}
}
第三种方式
第二种方式shuffle太多了,中间很多落盘操作,严重影响性能
package com.pihao.spark.requirement
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
val sc = new SparkContext(conf)
// TODO 需求一 : Top10热门品类
// TODO (找茬) :
// TODO 读取文件,获取原始数据
val fileDatas = sc.textFile("data/user_visit_action.txt")
val flatDatas = fileDatas.flatMap(
data => {
var datas = data.split("_")
if ( datas(6) != "-1" ) {
// 点击数据的场合
List((datas(6), (1, 0, 0)))
} else if ( datas(8) != "null" ) {
// 下单数据的场合
val id = datas(8)
val ids = id.split(",")
ids.map(
id => {
(id, (0, 1, 0))
}
)
} else if ( datas(10) != "null" ) {
// 支付数据的场合
val id = datas(10)
val ids = id.split(",")
ids.map(
id => {
(id, (0, 0, 1))
}
)
} else {
Nil
}
}
)
val top10 = flatDatas.reduceByKey(
(t1, t2) => {
( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
).sortBy(_._2, false).take(10)
// TODO 将结果采集后打印再控制台上
top10.foreach(println)
sc.stop()
}
}
第四种方式
package com.pihao.spark.requirement
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Test4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
val sc = new SparkContext(conf)
val fileDatas = sc.textFile("data/user_visit_action.txt")
// 创建累加器对象
val acc = new HotCategoryAccumulator()
// 注册累加器
sc.register(acc, "HotCategory")
fileDatas.foreach(
data => {
val datas = data.split("_")
if ( datas(6) != "-1" ) {
// 点击的场合
acc.add( (datas(6), "click") )
} else if ( datas(8) != "null" ) {
// 下单的场合
val id = datas(8)
val ids = id.split(",")
ids.foreach(
id => {
acc.add( (id, "order") )
}
)
} else if ( datas(10) != "null" ) {
// 支付的场合
val id = datas(10)
val ids = id.split(",")
ids.foreach(
id => {
acc.add( (id, "pay") )
}
)
}
}
)
// TODO 获取累加器的结果
val resultMap: mutable.Map[String, HotCategoryCount] = acc.value
val top10 = resultMap.map(_._2).toList.sortWith(
(left, right) => {
if ( left.clickCnt > right.clickCnt ) {
true
} else if ( left.clickCnt == right.clickCnt ) {
if ( left.orderCnt > right.orderCnt ) {
true
} else if ( left.orderCnt == right.orderCnt ) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}
).take(10)
top10.foreach(println)
sc.stop()
}
case class HotCategoryCount( cid:String, var clickCnt : Int, var orderCnt : Int, var payCnt : Int )
// TODO 自定义热门点击累加器
// 1. 继承AccumulatorV2
// 2. 定义泛型
// IN : (品类ID,行为类型)
// OUT : Map[品类ID, HotCategoryCount]
// 3. 重写方法 (3 + 3)
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]]{
private val map = mutable.Map[String, HotCategoryCount]()
override def isZero: Boolean = {
map.isEmpty
}
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]] = {
new HotCategoryAccumulator()
}
override def reset(): Unit = {
map.clear()
}
override def add(v: (String, String)): Unit = {
val (cid, actionType) = v
val hcc: HotCategoryCount = map.getOrElse(cid, HotCategoryCount(cid, 0, 0, 0))
actionType match {
case "click" => hcc.clickCnt += 1
case "order" => hcc.orderCnt += 1
case "pay" => hcc.payCnt += 1
}
map.update(cid, hcc)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]]): Unit = {
other.value.foreach {
case ( cid, otherHCC ) => {
val thisHCC: HotCategoryCount = map.getOrElse(cid, HotCategoryCount(cid, 0, 0, 0))
thisHCC.clickCnt += otherHCC.clickCnt
thisHCC.orderCnt += otherHCC.orderCnt
thisHCC.payCnt += otherHCC.payCnt
map.update(cid, thisHCC)
}
}
}
override def value: mutable.Map[String, HotCategoryCount] = {
map
}
}
}
需求二
Top10热门品类中每个品类的top10活跃session统计,
package com.pihao.spark.requirement
import com.pihao.bigdata.spark.summer.bean.UserVisitAction
import com.pihao.bigdata.spark.summer.common.TService
import com.pihao.bigdata.spark.summer.dao.{HotCategoryTop10Dao, HotCategoryTop10SessionDao}
import org.apache.spark.rdd.RDD
class Test5 extends TService {
private val hotCategoryTop10SessionDao = new HotCategoryTop10SessionDao
override def analysis( data : Any ) = {
//之前需求一统计出来的的top10的id
val topIds: Array[String] = data.asInstanceOf[Array[String]]
val fileDatas = hotCategoryTop10SessionDao.readFileBySpark("data/user_visit_action.txt")
val actionDatas = fileDatas.map(
data => {
val datas = data.split("_")
UserVisitAction(
datas(0),
datas(1).toLong,
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12).toLong
)
}
)
val clickDatas = actionDatas.filter {
data => {
if ( data.click_category_id != -1 ) {
topIds.contains(data.click_category_id.toString) // toString一定要转换
} else {
false
}
}
}
val reduceDatas = clickDatas.map(
data => {
(( data.click_category_id, data.session_id ), 1)
}
).reduceByKey(_+_)
val groupDatas: RDD[(Long, Iterable[(String, Int)])] = reduceDatas.map {
case ((cid, sid), cnt) => {
(cid, (sid, cnt))
}
}.groupByKey()
groupDatas.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
).collect()
}
}
需求三
页面单跳转换率统计
只统计【1,2,3,4,5,6,7】号页面的
package com.pihao.spark.requirement
import com.pihao.bigdata.spark.summer.bean.UserVisitAction
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test6 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Pageflow")
val sc = new SparkContext(conf)
val fileDatas = sc.textFile("data/user_visit_action.txt")
val actionDatas = fileDatas.map(
data => {
val datas = data.split("_")
UserVisitAction(
datas(0),
datas(1).toLong,
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12).toLong
)
}
)
actionDatas.cache()
// 【1,2,3,4,5,6,7】
val okIds = List(1,2,3,4,5,6,7)
// 【(1,2),(2,3)】
val okFlowIds = okIds.zip(okIds.tail)
// TODO 分母的计算
val result: Map[Long, Int] = actionDatas.filter(
action => { //分母过滤下,分母只能是1-6号页面
okIds.init.contains(action.page_id.toInt) //使用contains,要确保类型一致
}
).map(
action => {
(action.page_id, 1)
}
).reduceByKey(_ + _).collect().toMap //求出每个页面的访问量
// TODO 分子的计算
// 将数据按照session进行分组
val groupRDD: RDD[(String, Iterable[UserVisitAction])] = actionDatas.groupBy(_.session_id)
// 将分组后的数据进行组内排序
val mapRDD = groupRDD.mapValues(
iter => {
val actions: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
//【1,2,3,4,5,6,7】
//【2,3,4,5,6,7】
// 滑窗
//【1-2,2-3,3-4,4-5,5-6,6-7】
val ids: List[Int] = actions.map(_.page_id.toInt)
// val iterator: Iterator[List[Long]] = ids.sliding(2)
// while ( iterator.hasNext ) {
// val longs: List[Long] = iterator.next()
// (longs.head, longs.last)
// }
val flowIds: List[(Int, Int)] = ids.zip(ids.tail)
flowIds.filter(
ids => {
okFlowIds.contains(ids)
}
) //最后得到的结果就是(sessioniId,(1,2)),(sessionId,2,3)...
}
)
val mapRDD2 = mapRDD.map(_._2)
val flatRDD = mapRDD2.flatMap(list => list)
// 分子计算完毕
val reduceRDD = flatRDD.map((_, 1)).reduceByKey(_ + _)//((1,2),100)
// TODO 单挑转换率的统计
reduceRDD.foreach {
case ( (id1, id2), cnt ) => {
println(s"页面【${id1}-${id2}】单挑转换率为 :" + ( cnt.toDouble / result.getOrElse(id1, 1) ))
}
}
sc.stop()
}
}



