栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark进阶(四):RDD使用

spark进阶(四):RDD使用

Spark提供了一种对数据的核心抽象,称为弹性分布式数据集(ResilientDistributed Dataset,RDD)。这个数据集的全部或部分可以缓存在内存中,并且可以在多次计算时重用。RDD其实就是一个分布在多个节点上的数据集合。

RDD的主要特征如下:

  • RDD是不可变的,但可以将RDD转换成新的RDD进行操作。
  • RDD是可分区的。RDD由很多分区组成,每个分区对应一个Task任务来执行(关于分区将在3.4节详细讲解)。
  • 对RDD进行操作,相当于对RDD的每个分区进行操作。
  • RDD拥有一系列对分区进行计算的函数,称为算子(关于算子将在3.3节详细讲解)。
  • RDD之间存在依赖关系,可以实现管道化,避免了中间数据的存储。
一、测试数据说明

知名的电影推荐数据集:MovieLen 官网https://grouplens.org/datasets/movielens/

这里选用小数据集进行测试,逻辑写好了,可以再去尝试大的数据集。

数据集一共三个文件:

  • movies.dat
  • ratings.dat
  • users.dat
ratings.dat

为电影等级评定文件, 字段内容如下:

UserID::MovieID::Rating::Timestamp
用户ID::电影ID::评分::时间戳
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
  • 每个用户至少20个评级
  • 评分:1-5分
movies.dat

为电影文件, 字段内容如下:

MovieID::Title::Genres
电影ID::电影名::电影类型
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance

其中电影类型为下面情况:

Action   行动
Adventure   冒险
Animation   动画片
Children's   孩子们的
Comedy   喜剧
Crime   犯罪
documentary   纪录片
Drama   戏剧
Fantasy   幻想
Film-Noir   黑色电影
Horror   恐怖
Musical   音乐剧
Mystery   谜底
Romance   浪漫
Sci-Fi   科幻
Thriller   惊悚
War   战争
Western   西方
users.dat

用户文件,相关字段内容如下:

UserID::Gender::Age::Occupation::Zip-code
用户ID::性别::年龄::职业::邮编
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117

年龄代表的是一个范围:

 1:   "1-18"
18:  "18-24"
25:  "25-34"
35:  "35-44"
45:  "45-49"
50:  "50-55"
56:  "56+"

职业是枚举编号,对应关系为:

 0:  "other" or not specified      “其他”或未指定    
 1:  "academic/educator"      “学者/教育家”    
 2:  "artist"      “艺术家”    
 3:  "clerical/admin"      “文员/管理员”    
 4:  "college/grad student"      “大学生/研究生”    
 5:  "customer service"      “客户服务”    
 6:  "doctor/health care"      “医生/保健”    
 7:  "executive/managerial"      “执行/管理”    
 8:  "farmer"      “农民”    
 9:  "homemaker"      “家庭主妇”    
10:  "K-12 student"      “K-12 学生”    
11:  "lawyer"      “律师”    
12:  "programmer"      “程序员”    
13:  "retired"      “退休”    
14:  "sales/marketing"      “销售与市场营销”    
15:  "scientist"      “科学家”    
16:  "self-employed"      “自雇人士”    
17:  "technician/engineer"      “技术员/工程师”    
18:  "tradesman/craftsman"      “商人/工匠”    
19:  "unemployed"      “失业”    
20:  "writer"      “作家”    
二、RDD使用练习

1.获取电影评价top10

  • 首先对数据进行切分,获取到电影ID出现情况
  • 然后根据电影ID做聚合处理
  • 通过调换电影ID和出现次数将出现次数换到key的位置
  • 最后根据出现次数左排序处理
  • 打印出前十个出现情况
object MovieLen {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val dataPath = "/home/ffzs/data/ml-1m"

    val conf = new SparkConf()
    conf.setAppName("movieLen")
    conf.setMaster("local[*]")

    val sc = new SparkContext(conf)
    
    val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
    // 电影评价次数排名 top10
    ratingsRdd.map(_.split("::"))  // 切分数据
      .map(_(1) -> 1)  // 获取id -> 1 的key-value
      .reduceByKey(_+_)  // 对电影id进行聚合计算出每个电影出现的次数
      .map(it => (it._2, it._1))   // 调整key-value位置为了对出现次数做排序处理
      .sortByKey(false)   // 对出现次数进行排序
      .take(10)   // 获取前10个值
      .foreach(println)
  }
}

结果输出:

2.获取口碑top10
  • 通过电影ID做聚合求的电影的总分和总访问次数
  • 然后对每个电影求平均分
  • 最后排序输出top10
    println("电影口碑(评分)排名 top10")
    ratingsRdd.map(_.split("::"))
      .map(it => (it(1), (it(2).toDouble, 1)))  // 评分转化为double类型方便做除法计算
      .reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))  // 通过电影的对总分和总观看次数做聚合
      .map(it => ((it._2._1/it._2._2), it._1))  // 通过总分和总次数求得评论的平均分
      .sortByKey(false)  // 通过评论的平均分做排序处理
      .take(10)  // 获取top10
      .foreach(println)

输出结果:

3.男性中口碑最好电影top10
  • 对评分和用户性别做join处理
  • 然后根据性别做数据筛选
  • 最后根据上面的口碑逻辑选出top10
    val gender = "M"
    val genderMap:Map[String, String] = Map("M"->"男性", "F"->"女性")
    println(f"最受${genderMap(gender)}欢迎的电影 top10:")
    ratingsRdd.map(_.split("::"))
      .map(x => (x(0), (x(0), x(1), x(2))))
      .join(   // 对评分和用户的性别根据用户ID做join处理
        usersRdd.map(_.split("::"))
          .map(x=> x(0)->x(1))
      )
      .filter(_._2._2.equals(gender))   // 筛选出对应的性别
      .map(it => (it._2._1._2, (it._2._1._3.toDouble, 1)))   // 求平均分做排序
      .reduceByKey((x,y) => (x._1+y._1, x._2+y._2))
      .map(it => (it._2._1/it._2._2, it._1))
      .sortByKey(ascending = false)
      .map(it => it._2->it._1)
      .take(10)
      .foreach(println)

输出结果为:

4.评分根据时间进行二次排序
  • 构建通过评分和时间的二次排序处理类,评分和时间都是降序
  • 通过排序类为key做排序处理
  • 然后再输出每一行数据即可
class SecondSortKey(val first:Int, val second:Int) extends Ordered[SecondSortKey] with Serializable {
  override def compare(that: SecondSortKey): Int = {
    if (this.first != that.first) {
      this.first-that.first
    }
    else{
      this.second-that.second
    }
  }
}
  ratingsRdd.map(line => {
    val row = line.split("::")
    ((new SecondSortKey(row(2).toInt, row(3).toInt)), line)
  })
    .sortByKey(false)
    .map(_._2)
    .take(10)
    .foreach(println)
}

5.电影类型top10
  • 通过电影中的电影类型通过flatmap进行分割
  • 然后统计每个类型出现次数
  • 最后排序之后输出前10个
println("电影类型 top10")
movieRdd.map(_.split("::")(2))
  .flatMap(_.split("\|"))
  .map((_, 1))
  .reduceByKey(_+_)
  .map(it => (it._2, it._1))
  .sortByKey(ascending = false)
  .map(it=> it._2->it._1)
  .take(10)
  .foreach(println)

输出:

6.每日新增用户
  • 首先将时间戳转化为日期
  • 然后通用户ID分组获取每一组中最小的日期,即为用户新增日期
  • 然后再对该日期做聚合
  • 最后进行排序输出
println("每日新增用户 top10")
val sdf = new SimpleDateFormat("yyyy-MM-dd")
ratingsRdd.map(_.split("::"))
  .map(it => (it(0), it(3).toLong*1000))
  .map(it => (it._1, sdf.format(it._2)))
  .groupByKey()
  .map(it => (it._2.min, 1))
  .reduceByKey(_+_)
  .map(it => it._2 -> it._1)
  .sortByKey(ascending = false)
  .map(it => it._2 -> it._1)
  .take(10)
  .foreach(println)

输出结果为:

完整代码:

object MovieLen {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val dataPath = "/home/ffzs/data/ml-1m"

    val conf = new SparkConf()
    conf.setAppName("movieLen")
    conf.setMaster("local[*]")

    val sc = new SparkContext(conf)
    val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
    val usersRdd = sc.textFile(f"$dataPath/users.dat")
    val movieRdd = sc.textFile(f"$dataPath/movies.dat")
    
    println("电影评价次数排名 top10")
    ratingsRdd.map(_.split("::"))  // 切分数据
      .map(_(1) -> 1)  // 获取id -> 1 的key-value
      .reduceByKey(_+_)  // 对电影id进行聚合计算出每个电影出现的次数
      .map(it => (it._2, it._1))   // 调整key-value位置为了对出现次数做排序处理
      .sortByKey(false)   // 对出现次数进行排序
      .take(10)   // 获取前10个值
      .foreach(println)

    println("电影口碑(评分)排名 top10")
    ratingsRdd.map(_.split("::"))
      .map(it => (it(1), (it(2).toDouble, 1)))  // 评分转化为double类型方便做除法计算
      .reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))  // 通过电影的对总分和总观看次数做聚合
      .map(it => ((it._2._1/it._2._2), it._1))  // 通过总分和总次数求得评论的平均分
      .sortByKey(false)  // 通过评论的平均分做排序处理
      .take(10)  // 获取top10
      .foreach(println)

    val gender = "M"
    val genderMap:Map[String, String] = Map("M"->"男性", "F"->"女性")
    println(f"最受${genderMap(gender)}欢迎的电影 top10:")
    ratingsRdd.map(_.split("::"))
      .map(x => (x(0), (x(0), x(1), x(2))))
      .join(   // 对评分和用户的性别根据用户ID做join处理
        usersRdd.map(_.split("::"))
          .map(x=> x(0)->x(1))
      )
      .filter(_._2._2.equals(gender))   // 筛选出对应的性别
      .map(it => (it._2._1._2, (it._2._1._3.toDouble, 1)))   // 求平均分做排序
      .reduceByKey((x,y) => (x._1+y._1, x._2+y._2))
      .map(it => (it._2._1/it._2._2, it._1))
      .sortByKey(ascending = false)
      .map(it => it._2->it._1)
      .take(10)
      .foreach(println)

    println("评分时间二次排序: ")
    ratingsRdd.map(line => {
      val row = line.split("::")
      ((new SecondSortKey(row(2).toInt, row(3).toInt)), line)
    })
      .sortByKey(false)
      .map(_._2)
      .take(5)
      .foreach(println)

    println("电影类型 top10")
    movieRdd.map(_.split("::")(2))
      .flatMap(_.split("\|"))
      .map((_, 1))
      .reduceByKey(_+_)
      .map(it => (it._2, it._1))
      .sortByKey(ascending = false)
      .map(it=> it._2->it._1)
      .take(10)
      .foreach(println)

    println("每日新增用户 top10")
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    ratingsRdd.map(_.split("::"))
      .map(it => (it(0), it(3).toLong*1000))
      .map(it => (it._1, sdf.format(it._2)))
      .groupByKey()
      .map(it => (it._2.min, 1))
      .reduceByKey(_+_)
      .map(it => it._2 -> it._1)
      .sortByKey(ascending = false)
      .map(it => it._2 -> it._1)
      .take(10)
      .foreach(println)
  }
}

class SecondSortKey(val first:Int, val second:Int) extends Ordered[SecondSortKey] with Serializable {
  override def compare(that: SecondSortKey): Int = {
    if (this.first != that.first) {
      this.first-that.first
    }
    else{
      this.second-that.second
    }
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/304656.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号