栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark实现简单需求案例

spark实现简单需求案例

需求案例 数据格式
* 编号	字段名称			字段类型		字段含义
 * 1	date				String		用户点击行为的日期
 * 2	user_id				Long		用户的ID
 * 3	session_id			String		Session的ID
 * 4	page_id				Long		某个页面的ID
 * 5	action_time			String		动作的时间点
 * 6	search_keyword		String		用户搜索的关键词
 * 7	click_category_id	Long		某一个商品品类的ID
 * 8	click_product_id	Long		某一个商品的ID
 * 9	order_category_ids	String		一次订单中所有品类的ID集合
 * 10	order_product_ids	String		一次订单中所有商品的ID集合
 * 11	pay_category_ids	String		一次支付中所有品类的ID集合
 * 12	pay_product_ids		String		一次支付中所有商品的ID集合
 * 13	city_id				Long		城市 id
需求一

统计top10热门商品品类,

先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

package com.pihao.spark.requirement

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.immutable.StringOps
import scala.collection.mutable.ArrayOps


object Test1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("CategoryTop10")
    val sc = new SparkContext(conf)
    // 1.获取原始数据文件
    val fileDatas: RDD[String] = sc.textFile("data/user_visit_action.txt")
    // 2.统计品类的点击量
    //2.1统计前需要将不需要的数据过滤掉,保留是点击的数据
    val clickDatas: RDD[String] = fileDatas.filter(
      line => {
        val datas: Array[String] = line.split("_")
        val cid: String = datas(6)
        cid != "-1" //click_category_id不为-1则表示点击的
      }
    )
    //2.2获取所有品类的点击量,(cid,clickSum)
    val clickCntDatas: RDD[(String, Int)] = clickDatas.map(
      clickData => {
        val datas: Array[String] = clickData.split("_")
        (datas(6), 1) //将数据组装为(cid,1)的格式
      }
    ).reduceByKey(_ + _)
    // 3.统计品类的下单数
    //3.1统计前需要将不需要的数据过滤掉,保留是下单的数据
    val orderDatas: RDD[String] = fileDatas.filter(
      line => {
        val datas: Array[String] = line.split("_")
        val cid: String = datas(8)
        cid != "null" //order_category_ids不为null则表示下单的
      }
    )
    //3.2获取所有品类的下单量
    //目前orderDatas的数据格式为((cid1,cid2,cid3),1)
    val orderCntDatas: RDD[(String, Int)] = orderDatas.flatMap(
      orderData => {
        val datas: Array[String] = orderData.split("_")
        val cid: StringOps = datas(8)
        val cids: ArrayOps.ofRef[String] = cid.split(",")
        cids.map((_, 1))
      }
    ).reduceByKey(_ + _)

    //4.统计品类的支付数
    //4.1统计前需要将不需要的数据过滤掉,保留是支付的数据
    val payDatas: RDD[String] = fileDatas.filter(
      line => {
        val datas: Array[String] = line.split("_")
        val cid: String = datas(10)
        cid != "null" //order_category_ids不为null则表示下单的
      }
    )
    //4.2获取所有品类的支付数
    val payCntDatas: RDD[(String, Int)] = payDatas.flatMap(
      payData => {
        val datas: ArrayOps.ofRef[String] = payData.split("_")
        val cid: StringOps = datas(10)
        val cids: ArrayOps.ofRef[String] = cid.split(",")
        cids.map((_, 1))
      }
    ).reduceByKey(_ + _)

    //ok,现在各个品类的点击数,下单数,支付数都统计出来了?那么怎么将他们进行排序呢
    //规则:先根据点击数排序,点击数相同,在根据下单数,下单数相同在根据支付数
    //这里需要讲一种数据类型:元组,(点击数,下单数,支付数),它的排序就是这样这个的顺序
    //那么就需要将clickCntDatas,orderCntDatas,payCntDatas转成一个元组,使用cogroup
    //数据格式是 (cid,(点击数,下单数,支付数))
    val cidCntDatas: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
    clickCntDatas.cogroup(orderCntDatas, payCntDatas)

    //获取所有组装好的的数据(cid,(sum1,sum2,sum3))
    val mapDatas: RDD[(String, (Int, Int, Int))] = cidCntDatas.map {
      case (key, (clickIter, orderIter, payIter)) =>
        (key, (clickIter.iterator.next(), orderIter.iterator.next(), payIter.iterator.next()))
    }
    //排序获取top10
    val top10: Array[(String, (Int, Int, Int))] = mapDatas.sortBy(_._2,false).take(10)


    //将top10打印出来
    top10.foreach(println)
    sc.stop()
  }

}

第二种方式

第一种方式缺点

//1.同一个RDD重复使用。解决(加缓存) 
fileDatas.persist(StorageLevel.MEMORY_AND_DISK)
//cogroup算子效率地下,可能存在笛卡尔乘积,可能shuffle
换一种思路,使用reduceByKey
package com.pihao.spark.requirement

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Test2 {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
        val sc = new SparkContext(conf)

        // TODO 需求一 : Top10热门品类
        // TODO (找茬) :
        // 1. 同一个RDD的重复使用
        // 2. cogroup算子可能性能底下

        // TODO 读取文件,获取原始数据
        val fileDatas = sc.textFile("data/user_visit_action.txt")
        fileDatas.persist(StorageLevel.MEMORY_AND_DISK) //加上缓存优化性能

        // TODO 统计品类的点击数量
        // 统计分区前需要将不需要的数据过滤掉
        // 先保留所有的点击数据
        val clickDatas = fileDatas.filter(
            data => {
                val datas = data.split("_")
                val cid = datas(6)
                cid != "-1"
            }
        )
        // 对点击数据进行统计
        val clickCntDatas = clickDatas.map(
            data => {
                val datas = data.split("_")
                val cid = datas(6)
                (cid, 1)
            }
        ).reduceByKey(_+_)

        // TODO 统计品类的下单数量
        // 先保留所有的下单数据
        val orderDatas = fileDatas.filter(
            data => {
                val datas = data.split("_")
                val cid = datas(8)
                cid != "null"
            }
        )
        // 对下单数据进行统计
        // (1,2,3,4) => ((1,2,3,4), 1)
        // 1, 2, 3, 4
        // (1,1),(2,1),(3,1),(4,1)
        val orderCntDatas = orderDatas.flatMap(
            data => {
                val datas = data.split("_")
                val cid = datas(8)
                val cids = cid.split(",")
                cids.map((_, 1))
            }
        ).reduceByKey(_+_)
        // TODO 统计品类的支付数量
        // 先保留所有的支付数据
        val payDatas = fileDatas.filter(
            data => {
                val datas = data.split("_")
                val cid = datas(10)
                cid != "null"
            }
        )
        // 对下单数据进行统计
        val payCntDatas = payDatas.flatMap(
            data => {
                val datas = data.split("_")
                val cid = datas(10)
                val cids = cid.split(",")
                cids.map((_, 1))
            }
        ).reduceByKey(_+_)

        // 从这开始
        // TODO 对统计结果进行排序 => Tuple(点击,下单,支付)
        // (品类ID, 点击) => ( 品类ID, ( 点击,0,0 ) )
        // (品类ID, 下单) => ( 品类ID, ( 0,下单,0 ) )
        // (品类ID, 支付) => ( 品类ID, ( 0,0,支付 ))
        // (品类ID, ( 点击,下单,支付 ))
        // 3 => 1 => (聚合)
        // reduceByKey
        // (( 点击,下单,支付 ), ( 点击,下单,支付 )) => ( 点击,下单,支付 )
        val clickMapDatas = clickCntDatas.map {
            case ( cid, cnt ) => {
                ( cid, (cnt, 0, 0) )
            }
        }
        val orderMapDatas = orderCntDatas.map {
            case ( cid, cnt ) => {
                ( cid, (0, cnt, 0) )
            }
        }
        val payMapDatas = payCntDatas.map {
            case ( cid, cnt ) => {
                ( cid, (0, 0, cnt) )
            }
        }

        val unionRDD: RDD[(String, (Int, Int, Int))] = clickMapDatas.union(orderMapDatas).union(payMapDatas)

        val reduceRDD = unionRDD.reduceByKey(
            (t1, t2) => {
                ( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )
            }
        )

        val top10 = reduceRDD.sortBy(_._2, false).take(10)


        // TODO 将结果采集后打印再控制台上
        top10.foreach(println)

        sc.stop()

    }
}

第三种方式

第二种方式shuffle太多了,中间很多落盘操作,严重影响性能

package com.pihao.spark.requirement

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test3 {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
        val sc = new SparkContext(conf)

        // TODO 需求一 : Top10热门品类
        // TODO (找茬) :

        // TODO 读取文件,获取原始数据
        val fileDatas = sc.textFile("data/user_visit_action.txt")

        val flatDatas = fileDatas.flatMap(
            data => {
                var datas = data.split("_")
                if ( datas(6) != "-1" ) {
                    // 点击数据的场合
                    List((datas(6), (1, 0, 0)))
                } else if ( datas(8) != "null" ) {
                    // 下单数据的场合
                    val id = datas(8)
                    val ids = id.split(",")
                    ids.map(
                        id => {
                            (id, (0, 1, 0))
                        }
                    )
                } else if ( datas(10) != "null" ) {
                    // 支付数据的场合
                    val id = datas(10)
                    val ids = id.split(",")
                    ids.map(
                        id => {
                            (id, (0, 0, 1))
                        }
                    )
                } else {
                    Nil
                }
            }
        )

        val top10 = flatDatas.reduceByKey(
            (t1, t2) => {
                ( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )
            }
        ).sortBy(_._2, false).take(10)

        // TODO 将结果采集后打印再控制台上
        top10.foreach(println)

        sc.stop()

    }
}

第四种方式
package com.pihao.spark.requirement

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Test4 {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10")
        val sc = new SparkContext(conf)

        val fileDatas = sc.textFile("data/user_visit_action.txt")

        // 创建累加器对象
        val acc = new HotCategoryAccumulator()
        // 注册累加器
        sc.register(acc, "HotCategory")

        fileDatas.foreach(
            data => {
                val datas = data.split("_")
                if ( datas(6) != "-1" ) {
                    // 点击的场合
                    acc.add( (datas(6), "click") )
                } else if ( datas(8) != "null" ) {
                    // 下单的场合
                    val id = datas(8)
                    val ids = id.split(",")
                    ids.foreach(
                        id => {
                            acc.add( (id, "order") )
                        }
                    )
                } else if ( datas(10) != "null" ) {
                    // 支付的场合
                    val id = datas(10)
                    val ids = id.split(",")
                    ids.foreach(
                        id => {
                            acc.add( (id, "pay") )
                        }
                    )
                }
            }
        )

        // TODO 获取累加器的结果
        val resultMap: mutable.Map[String, HotCategoryCount] = acc.value
        val top10 = resultMap.map(_._2).toList.sortWith(
            (left, right) => {
                if ( left.clickCnt > right.clickCnt ) {
                    true
                } else if ( left.clickCnt == right.clickCnt ) {
                    if ( left.orderCnt > right.orderCnt ) {
                        true
                    } else if ( left.orderCnt == right.orderCnt ) {
                        left.payCnt > right.payCnt
                    } else {
                        false
                    }
                } else {
                    false
                }
            }
        ).take(10)

        top10.foreach(println)

        sc.stop()

    }
    case class HotCategoryCount( cid:String, var clickCnt : Int, var orderCnt : Int, var payCnt : Int )
    // TODO 自定义热门点击累加器
    //  1. 继承AccumulatorV2
    //  2. 定义泛型
    //     IN : (品类ID,行为类型)
    //     OUT : Map[品类ID, HotCategoryCount]
    //  3. 重写方法 (3 + 3)
    class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]]{

        private val map = mutable.Map[String, HotCategoryCount]()

        override def isZero: Boolean = {
            map.isEmpty
        }

        override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]] = {
            new HotCategoryAccumulator()
        }

        override def reset(): Unit = {
            map.clear()
        }

        override def add(v: (String, String)): Unit = {
            val (cid, actionType) = v
            val hcc: HotCategoryCount = map.getOrElse(cid, HotCategoryCount(cid, 0, 0, 0))
            actionType match {
                case "click" => hcc.clickCnt += 1
                case "order" => hcc.orderCnt += 1
                case "pay" => hcc.payCnt += 1
            }
            map.update(cid, hcc)
        }

        override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategoryCount]]): Unit = {
            other.value.foreach {
                case ( cid, otherHCC ) => {
                    val thisHCC: HotCategoryCount = map.getOrElse(cid, HotCategoryCount(cid, 0, 0, 0))
                    thisHCC.clickCnt += otherHCC.clickCnt
                    thisHCC.orderCnt += otherHCC.orderCnt
                    thisHCC.payCnt += otherHCC.payCnt
                    map.update(cid, thisHCC)
                }
            }
        }

        override def value: mutable.Map[String, HotCategoryCount] = {
            map
        }
    }
}

需求二

Top10热门品类中每个品类的top10活跃session统计,

package com.pihao.spark.requirement

import com.pihao.bigdata.spark.summer.bean.UserVisitAction
import com.pihao.bigdata.spark.summer.common.TService
import com.pihao.bigdata.spark.summer.dao.{HotCategoryTop10Dao, HotCategoryTop10SessionDao}
import org.apache.spark.rdd.RDD

class Test5 extends TService {

    private val hotCategoryTop10SessionDao = new HotCategoryTop10SessionDao

    override def analysis( data : Any ) = {
        //之前需求一统计出来的的top10的id
        val topIds: Array[String] = data.asInstanceOf[Array[String]]

        val fileDatas = hotCategoryTop10SessionDao.readFileBySpark("data/user_visit_action.txt")

        val actionDatas = fileDatas.map(
            data => {
                val datas = data.split("_")
                UserVisitAction(
                    datas(0),
                    datas(1).toLong,
                    datas(2),
                    datas(3).toLong,
                    datas(4),
                    datas(5),
                    datas(6).toLong,
                    datas(7).toLong,
                    datas(8),
                    datas(9),
                    datas(10),
                    datas(11),
                    datas(12).toLong
                )
            }
        )

        val clickDatas = actionDatas.filter {
            data => {
                if ( data.click_category_id != -1 ) {
                    topIds.contains(data.click_category_id.toString) // toString一定要转换
                } else {
                    false
                }
            }
        }

        val reduceDatas = clickDatas.map(
            data => {
                (( data.click_category_id, data.session_id ), 1)
            }
        ).reduceByKey(_+_)

        val groupDatas: RDD[(Long, Iterable[(String, Int)])] = reduceDatas.map {
            case ((cid, sid), cnt) => {
                (cid, (sid, cnt))
            }
        }.groupByKey()

        groupDatas.mapValues(
            iter => {
                iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
            }
        ).collect()
    }
}

需求三

页面单跳转换率统计

只统计【1,2,3,4,5,6,7】号页面的

package com.pihao.spark.requirement

import com.pihao.bigdata.spark.summer.bean.UserVisitAction
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test6  {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("Pageflow")
        val sc = new SparkContext(conf)

        val fileDatas = sc.textFile("data/user_visit_action.txt")

        val actionDatas = fileDatas.map(
            data => {
                val datas = data.split("_")
                UserVisitAction(
                    datas(0),
                    datas(1).toLong,
                    datas(2),
                    datas(3).toLong,
                    datas(4),
                    datas(5),
                    datas(6).toLong,
                    datas(7).toLong,
                    datas(8),
                    datas(9),
                    datas(10),
                    datas(11),
                    datas(12).toLong
                )
            }
        )
        actionDatas.cache()

        // 【1,2,3,4,5,6,7】
        val okIds = List(1,2,3,4,5,6,7)
        // 【(1,2),(2,3)】
        val okFlowIds = okIds.zip(okIds.tail)  

        // TODO 分母的计算
        val result: Map[Long, Int] = actionDatas.filter(
            action => { //分母过滤下,分母只能是1-6号页面
                okIds.init.contains(action.page_id.toInt) //使用contains,要确保类型一致
            }
        ).map(
            action => {
                (action.page_id, 1)
            }
        ).reduceByKey(_ + _).collect().toMap 	//求出每个页面的访问量

        // TODO 分子的计算
        //  将数据按照session进行分组
        val groupRDD: RDD[(String, Iterable[UserVisitAction])] = actionDatas.groupBy(_.session_id)

        // 将分组后的数据进行组内排序
        val mapRDD = groupRDD.mapValues(
            iter => {
                val actions: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
                //【1,2,3,4,5,6,7】
                //【2,3,4,5,6,7】
                // 滑窗
                //【1-2,2-3,3-4,4-5,5-6,6-7】
                val ids: List[Int] = actions.map(_.page_id.toInt)

                //                val iterator: Iterator[List[Long]] = ids.sliding(2)
                //                while ( iterator.hasNext ) {
                //                    val longs: List[Long] = iterator.next()
                //                    (longs.head, longs.last)
                //                }
                val flowIds: List[(Int, Int)] = ids.zip(ids.tail)

                flowIds.filter(
                    ids => {
                        okFlowIds.contains(ids)
                    }
                ) //最后得到的结果就是(sessioniId,(1,2)),(sessionId,2,3)...
            }
        )
        val mapRDD2 = mapRDD.map(_._2)
        val flatRDD = mapRDD2.flatMap(list => list)

        // 分子计算完毕
        val reduceRDD = flatRDD.map((_, 1)).reduceByKey(_ + _)//((1,2),100)
        // TODO 单挑转换率的统计
        reduceRDD.foreach {
            case ( (id1, id2), cnt ) => {
                println(s"页面【${id1}-${id2}】单挑转换率为 :" + ( cnt.toDouble / result.getOrElse(id1, 1) ))
            }
        }


        sc.stop()

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433945.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号