Spark提供了一种对数据的核心抽象,称为弹性分布式数据集(ResilientDistributed Dataset,RDD)。这个数据集的全部或部分可以缓存在内存中,并且可以在多次计算时重用。RDD其实就是一个分布在多个节点上的数据集合。
RDD的主要特征如下:
- RDD是不可变的,但可以将RDD转换成新的RDD进行操作。
- RDD是可分区的。RDD由很多分区组成,每个分区对应一个Task任务来执行(关于分区将在3.4节详细讲解)。
- 对RDD进行操作,相当于对RDD的每个分区进行操作。
- RDD拥有一系列对分区进行计算的函数,称为算子(关于算子将在3.3节详细讲解)。
- RDD之间存在依赖关系,可以实现管道化,避免了中间数据的存储。
知名的电影推荐数据集:MovieLen 官网https://grouplens.org/datasets/movielens/
这里选用小数据集进行测试,逻辑写好了,可以再去尝试大的数据集。
数据集一共三个文件:
- movies.dat
- ratings.dat
- users.dat
为电影等级评定文件, 字段内容如下:
UserID::MovieID::Rating::Timestamp 用户ID::电影ID::评分::时间戳 1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275
- 每个用户至少20个评级
- 评分:1-5分
为电影文件, 字段内容如下:
MovieID::Title::Genres 电影ID::电影名::电影类型 1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance
其中电影类型为下面情况:
Action 行动 Adventure 冒险 Animation 动画片 Children's 孩子们的 Comedy 喜剧 Crime 犯罪 documentary 纪录片 Drama 戏剧 Fantasy 幻想 Film-Noir 黑色电影 Horror 恐怖 Musical 音乐剧 Mystery 谜底 Romance 浪漫 Sci-Fi 科幻 Thriller 惊悚 War 战争 Western 西方users.dat
用户文件,相关字段内容如下:
UserID::Gender::Age::Occupation::Zip-code 用户ID::性别::年龄::职业::邮编 1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117
年龄代表的是一个范围:
1: "1-18" 18: "18-24" 25: "25-34" 35: "35-44" 45: "45-49" 50: "50-55" 56: "56+"
职业是枚举编号,对应关系为:
0: "other" or not specified “其他”或未指定 1: "academic/educator" “学者/教育家” 2: "artist" “艺术家” 3: "clerical/admin" “文员/管理员” 4: "college/grad student" “大学生/研究生” 5: "customer service" “客户服务” 6: "doctor/health care" “医生/保健” 7: "executive/managerial" “执行/管理” 8: "farmer" “农民” 9: "homemaker" “家庭主妇” 10: "K-12 student" “K-12 学生” 11: "lawyer" “律师” 12: "programmer" “程序员” 13: "retired" “退休” 14: "sales/marketing" “销售与市场营销” 15: "scientist" “科学家” 16: "self-employed" “自雇人士” 17: "technician/engineer" “技术员/工程师” 18: "tradesman/craftsman" “商人/工匠” 19: "unemployed" “失业” 20: "writer" “作家”二、RDD使用练习
1.获取电影评价top10
- 首先对数据进行切分,获取到电影ID出现情况
- 然后根据电影ID做聚合处理
- 通过调换电影ID和出现次数将出现次数换到key的位置
- 最后根据出现次数左排序处理
- 打印出前十个出现情况
object MovieLen {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val dataPath = "/home/ffzs/data/ml-1m"
val conf = new SparkConf()
conf.setAppName("movieLen")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
// 电影评价次数排名 top10
ratingsRdd.map(_.split("::")) // 切分数据
.map(_(1) -> 1) // 获取id -> 1 的key-value
.reduceByKey(_+_) // 对电影id进行聚合计算出每个电影出现的次数
.map(it => (it._2, it._1)) // 调整key-value位置为了对出现次数做排序处理
.sortByKey(false) // 对出现次数进行排序
.take(10) // 获取前10个值
.foreach(println)
}
}
结果输出:
2.获取口碑top10- 通过电影ID做聚合求的电影的总分和总访问次数
- 然后对每个电影求平均分
- 最后排序输出top10
println("电影口碑(评分)排名 top10")
ratingsRdd.map(_.split("::"))
.map(it => (it(1), (it(2).toDouble, 1))) // 评分转化为double类型方便做除法计算
.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)) // 通过电影的对总分和总观看次数做聚合
.map(it => ((it._2._1/it._2._2), it._1)) // 通过总分和总次数求得评论的平均分
.sortByKey(false) // 通过评论的平均分做排序处理
.take(10) // 获取top10
.foreach(println)
输出结果:
3.男性中口碑最好电影top10- 对评分和用户性别做join处理
- 然后根据性别做数据筛选
- 最后根据上面的口碑逻辑选出top10
val gender = "M"
val genderMap:Map[String, String] = Map("M"->"男性", "F"->"女性")
println(f"最受${genderMap(gender)}欢迎的电影 top10:")
ratingsRdd.map(_.split("::"))
.map(x => (x(0), (x(0), x(1), x(2))))
.join( // 对评分和用户的性别根据用户ID做join处理
usersRdd.map(_.split("::"))
.map(x=> x(0)->x(1))
)
.filter(_._2._2.equals(gender)) // 筛选出对应的性别
.map(it => (it._2._1._2, (it._2._1._3.toDouble, 1))) // 求平均分做排序
.reduceByKey((x,y) => (x._1+y._1, x._2+y._2))
.map(it => (it._2._1/it._2._2, it._1))
.sortByKey(ascending = false)
.map(it => it._2->it._1)
.take(10)
.foreach(println)
输出结果为:
4.评分根据时间进行二次排序- 构建通过评分和时间的二次排序处理类,评分和时间都是降序
- 通过排序类为key做排序处理
- 然后再输出每一行数据即可
class SecondSortKey(val first:Int, val second:Int) extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
if (this.first != that.first) {
this.first-that.first
}
else{
this.second-that.second
}
}
}
ratingsRdd.map(line => {
val row = line.split("::")
((new SecondSortKey(row(2).toInt, row(3).toInt)), line)
})
.sortByKey(false)
.map(_._2)
.take(10)
.foreach(println)
}
5.电影类型top10
- 通过电影中的电影类型通过flatmap进行分割
- 然后统计每个类型出现次数
- 最后排序之后输出前10个
println("电影类型 top10")
movieRdd.map(_.split("::")(2))
.flatMap(_.split("\|"))
.map((_, 1))
.reduceByKey(_+_)
.map(it => (it._2, it._1))
.sortByKey(ascending = false)
.map(it=> it._2->it._1)
.take(10)
.foreach(println)
输出:
6.每日新增用户- 首先将时间戳转化为日期
- 然后通用户ID分组获取每一组中最小的日期,即为用户新增日期
- 然后再对该日期做聚合
- 最后进行排序输出
println("每日新增用户 top10")
val sdf = new SimpleDateFormat("yyyy-MM-dd")
ratingsRdd.map(_.split("::"))
.map(it => (it(0), it(3).toLong*1000))
.map(it => (it._1, sdf.format(it._2)))
.groupByKey()
.map(it => (it._2.min, 1))
.reduceByKey(_+_)
.map(it => it._2 -> it._1)
.sortByKey(ascending = false)
.map(it => it._2 -> it._1)
.take(10)
.foreach(println)
输出结果为:
完整代码:
object MovieLen {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val dataPath = "/home/ffzs/data/ml-1m"
val conf = new SparkConf()
conf.setAppName("movieLen")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
val usersRdd = sc.textFile(f"$dataPath/users.dat")
val movieRdd = sc.textFile(f"$dataPath/movies.dat")
println("电影评价次数排名 top10")
ratingsRdd.map(_.split("::")) // 切分数据
.map(_(1) -> 1) // 获取id -> 1 的key-value
.reduceByKey(_+_) // 对电影id进行聚合计算出每个电影出现的次数
.map(it => (it._2, it._1)) // 调整key-value位置为了对出现次数做排序处理
.sortByKey(false) // 对出现次数进行排序
.take(10) // 获取前10个值
.foreach(println)
println("电影口碑(评分)排名 top10")
ratingsRdd.map(_.split("::"))
.map(it => (it(1), (it(2).toDouble, 1))) // 评分转化为double类型方便做除法计算
.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)) // 通过电影的对总分和总观看次数做聚合
.map(it => ((it._2._1/it._2._2), it._1)) // 通过总分和总次数求得评论的平均分
.sortByKey(false) // 通过评论的平均分做排序处理
.take(10) // 获取top10
.foreach(println)
val gender = "M"
val genderMap:Map[String, String] = Map("M"->"男性", "F"->"女性")
println(f"最受${genderMap(gender)}欢迎的电影 top10:")
ratingsRdd.map(_.split("::"))
.map(x => (x(0), (x(0), x(1), x(2))))
.join( // 对评分和用户的性别根据用户ID做join处理
usersRdd.map(_.split("::"))
.map(x=> x(0)->x(1))
)
.filter(_._2._2.equals(gender)) // 筛选出对应的性别
.map(it => (it._2._1._2, (it._2._1._3.toDouble, 1))) // 求平均分做排序
.reduceByKey((x,y) => (x._1+y._1, x._2+y._2))
.map(it => (it._2._1/it._2._2, it._1))
.sortByKey(ascending = false)
.map(it => it._2->it._1)
.take(10)
.foreach(println)
println("评分时间二次排序: ")
ratingsRdd.map(line => {
val row = line.split("::")
((new SecondSortKey(row(2).toInt, row(3).toInt)), line)
})
.sortByKey(false)
.map(_._2)
.take(5)
.foreach(println)
println("电影类型 top10")
movieRdd.map(_.split("::")(2))
.flatMap(_.split("\|"))
.map((_, 1))
.reduceByKey(_+_)
.map(it => (it._2, it._1))
.sortByKey(ascending = false)
.map(it=> it._2->it._1)
.take(10)
.foreach(println)
println("每日新增用户 top10")
val sdf = new SimpleDateFormat("yyyy-MM-dd")
ratingsRdd.map(_.split("::"))
.map(it => (it(0), it(3).toLong*1000))
.map(it => (it._1, sdf.format(it._2)))
.groupByKey()
.map(it => (it._2.min, 1))
.reduceByKey(_+_)
.map(it => it._2 -> it._1)
.sortByKey(ascending = false)
.map(it => it._2 -> it._1)
.take(10)
.foreach(println)
}
}
class SecondSortKey(val first:Int, val second:Int) extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
if (this.first != that.first) {
this.first-that.first
}
else{
this.second-that.second
}
}
}



