学习资料 林子雨《Spark编程基础》
不要老想着转换为(key, value)对!
不要老想着转换为(key, value)对!
不要老想着转换为(key, value)对!
怎么简单怎么来!
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
textFile = sc.textFile("file:///home/hadoop/jupyternotebook/chapter4-data01.txt")
textFile.collect()[0:5]
['Aaron,OperatingSystem,100', 'Aaron,Python,50', 'Aaron,ComputerNetwork,30', 'Aaron,Software,94', 'Abbott,Database,18']
# 1. 该系总共有多少名学生
rdd = textFile.map(lambda line : line.split(","))
# 取出名字这一行
rdd1 = rdd.map(lambda line : line[0])
# 名字转换为KV对,然后加和取键,即为学生个数
rdd2 = rdd1.map(lambda name : (name, 1)).reduceByKey(lambda a, b : a + b).keys().count()
print(rdd2)
# 265
# 1. 答案做法 # 去重再用count distinct_rdd1 = rdd1.distinct() distinct_rdd1.count() # 265
# 2. 该系共开设多少门课程 rdd3 = rdd.map(lambda line : line[1]) rdd4 = rdd3.map(lambda name : (name, 1)).reduceByKey(lambda a, b : a + b).keys().count() print(rdd4) # 答案做法 和第1题一样 8
# 3. Tom同学的总成绩平均分是多少
# 取出Tom行
Tom = rdd.filter(lambda line : line[0] == "Tom")
Tom.collect()
# [['Tom', 'Database', '26'],
# ['Tom', 'Algorithm', '12'],
# ['Tom', 'OperatingSystem', '16'],
# ['Tom', 'Python', '40'],
# ['Tom', 'Software', '60']]
# str转换为int key, value) ---> (key, (value, 1))
rdd5 = Tom.map(lambda line : (line[0], (int(line[2]), 1)))
rdd5.collect()
# [('Tom', (26, 1)),
# ('Tom', (12, 1)),
# ('Tom', (16, 1)),
# ('Tom', (40, 1)),
# ('Tom', (60, 1))]
# 计算总和
rdd6 = rdd5.reduceByKey(lambda a, b : (a[0] + b[0], a[1] + b[1])) # 括号不要忘了
rdd6.collect()
# [('Tom', (154, 5))]
# 计算平均数
rdd7 = rdd6.mapValues(lambda a : a[0] / a[1])
rdd7.collect()
# [('Tom', 30.8)]
# 3. 答案做法 score = Tom.map(lambda x:int(x[2])) //提取Tom同学的每门成绩,并转换为int类型 num = Tom.count() //Tom同学选课门数 sum_score = score.reduce(lambda x,y:x+y) //Tom同学的总成绩 avg = sum_score/num // 总成绩/门数=平均分 print(avg) # 30.8 # 我已经被KV对的思维给禁锢了!
在做题之前最好能够在纸上写出流程,这样可以一行代码解决
# 4. 每名同学的选修的课程总数
# rdd.map(lambda line : (line[0], 1)).reduceByKey(lambda a, b : a + b).collect()[0 : 5]
# [('Aaron', 4), ('Abbott', 3), ('Abel', 4), ('Abraham', 3), ('Adair', 3)]
rdd.map(lambda line : (line[0], 1)).reduceByKey(lambda a, b : a + b).count()
# 265
# 5. 该系Database课程共有多少人选秀
rdd.filter(lambda line : line[1] == "Database").map(lambda line : (line[1], 1))
.reduceByKey(lambda a, b : a + b).collect()
# count() 表示只有一行
# [('Database', 126)]
# 答案 rdd.filter(lambda line : line[1] == "Database").count() 很方便
# 6. 各门课程的平均分是多少 # Error str ---> int rdd.map(lambda line : (line[1], (int(line[2]), 1))).reduceByKey(lambda a, b : (a[0] + b[0], a[1] + b[1])) .mapValues(lambda a : a[0] / a[1]).collect() # mapValues(lambda a : round(a[0]/a[1], 2).collect() 保留两位小数
[('OperatingSystem', 54.940298507462686),
('Python', 57.8235294117647),
('ComputerNetwork', 51.901408450704224),
('Software', 50.90909090909091),
('Database', 50.53968253968254),
('Algorithm', 48.833333333333336),
('DataStructure', 47.57251908396947),
('CLanguage', 50.609375)]
# 7. 使用累加器计算共有多少人选修Database这门课 rdd.filter(lambda line : line[1] == "Database").count() # 126 # 答案 res = rdd.filter(lambda x:x[1]=="Database") accum = sc.accumulator(0) # 定义一个从0开始的累加器accum res.foreach(lambda x:accum.add(1)) # 遍历res,每扫描一条数据,累加器加1 accum.value # 输出累加器的最终值 # 126



