- educoder的spark学习
- 一、spark安装
- 二、pyspark的算子学习
- 2.1、Transformation - map
- 2.2、Transformation - mapPartitions
- 2.3、Transformation - filter
- 2.4、Transformation - flatMap
- 2.5、Transformation - distinct
- 2.6 、Transformation - sortBy
- 2.7、Transformation - sortByKey
- 2.8、Transformation - mapValues
- 2.9、Transformations - reduceByKey
- 2.10、Actions -pyspark常用算子
- 三、Spark RDD编程初级实践(scala)
- 3.1、数据去重
- 3.2、整合排序
- 3.3、求平均值
本次是在educoder这个平台上使用的,所以对于spark的安装方式是local本地模式,平台上有完整的安装步骤,在这里就不在继续叙述了,感谢理解
二、pyspark的算子学习 2.1、Transformation - map
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc =SparkContext("local","Simple App")
# 2.创建一个1到5的列表List
data=[1,2,3,4,5]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
需求:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 map 算子完成以上需求
rdd_map = rdd.map(lambda x: x**2 if x%2==0 else x**3)
# 6.使用rdd.collect() 收集完成 map 转换的元素
print(rdd_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
2.2、Transformation - mapPartitions
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
#********** Begin **********#
def f(iterator):
list = []
for x in iterator:
list.append((x,len(x)))
return list
#********** End **********#
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
data = ["dog", "salmon", "salmon", "rat", "elephant"]
rdd = sc.parallelize(data)
print(rdd.collect())
partitions = rdd.mapPartitions(f)
print(partitions.collect())
sc.stop()
2.3、Transformation - filter
filter 函数功能是对元素进行过滤,对每个元素应用f函数,返回值为 true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
data = [1, 2, 3, 4, 5, 6, 7, 8]
rdd = sc.parallelize(data)
print(rdd.collect())
rdd_filter = rdd.filter(lambda x: x % 2 == 0)
print(rdd_filter.collect())
sc.stop()
2.4、Transformation - flatMap
flatMap扁平化操作
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD中每个集合的元素合并为一个集合,内部创建:
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
需求:
合并RDD的元素,例如:
([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
"""
# 5.使用 filter 算子完成以上需求
faltmap= rdd.flatMap(lambda x: x)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(faltmap.collect())
# 7.停止 SparkContextsc.stio
sc.stop()
#********** End **********#
2.5、Transformation - distinct
distinct 将 RDD 中的元素进行去重操作。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
需求:
元素去重,例如:
1,2,3,3,2,1 --> 1,2,3
1,1,1,1, --> 1
"""
# 5.使用 distinct 算子完成以上需求
distinct = rdd.distinct()
# 6.使用rdd.collect() 收集完成 distinct 转换的元素
print(distinct.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
2.6 、Transformation - sortBy
该函数最多可以传三个参数:
- 第一个参数是一个函数,排序规则;
- 第二个参数是 ascending ,从字面的意思大家应该可以猜到,是的,这参数决定排序后 RDD 中的元素是升序还是降序,默认是 true ,也就是升序;
- 第三个参数是 numPartitions ,该参数决定排序后的 RDD 的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口}
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
data =[1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
5,4,3,1,2 --> 1,2,3,4,5
"""
# 5.使用 sortBy 算子完成以上需求
by = rdd.sortBy(lambda x: x)
# 6.使用rdd.collect() 收集完成 sortBy 转换的元素
print(by.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
2.7、Transformation - sortByKey
ascending参数是指排序(升序还是降序),默认是升序。numPartitions参数是重新分区,默认与上一个RDD保持一致。keyfunc参数是排序规则。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
data = [('B', 1), ('A', 2), ('C', 3)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]
"""
# 5.使用 sortByKey 算子完成以上需求
key = rdd.sortByKey()
# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
print(key.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
2.8、Transformation - mapValues
mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
需求:
元素(key,value)的value进行以下操作:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 mapValues 算子完成以上需求
values = rdd.mapValues(lambda x: x**2 if x%2==0 else x**3)
# 6.使用rdd.collect() 收集完成 mapValues 转换的元素
print(values.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
2.9、Transformations - reduceByKey
reduceByKey 算子,只是两个值合并成一个值,比如叠加。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
需求:
元素(key-value)的value累加操作,例如:
(1,1),(1,1),(1,2) --> (1,4)
(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)
"""
# 5.使用 reduceByKey 算子完成以上需求
print(rdd.reduceByKey(lambda x,y:x+y).collect())
# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
sc.stop()
2.10、Actions -pyspark常用算子
count
count():返回 RDD 的元素个数。
示例:
sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.count())
输出:
5
first
first():返回 RDD 的第一个元素(类似于take(1))。
示例:
sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.first())
输出:
python
take
take(n):返回一个由数据集的前 n 个元素组成的数组。
示例:
sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.take(2))
输出:
[‘python’, ‘python’]
reduce
reduce():通过func函数聚集 RDD 中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。
示例:
sc = SparkContext("local", "Simple App")
data = [1,1,1,1]
rdd = sc.parallelize(data)
print(rdd.reduce(lambda x,y:x+y))
输出:
4
collect
collect():在驱动程序中,以数组的形式返回数据集的所有元素。
示例:
sc = SparkContext("local", "Simple App")
data = [1,1,1,1]
rdd = sc.parallelize(data)
print(rdd.collect())
输出:
[1,1,1,1]
代码答案:
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.收集rdd的所有元素并print输出
rdd = sc.parallelize(data)
print(rdd.collect())
print(rdd.count())
# 5.统计rdd的元素个数并print输出
print(rdd.first())
# 6.获取rdd的第一个元素并print输出
print(rdd.take(3))
# 7.获取rdd的前3个元素并print输出
print(rdd.reduce(lambda x,y:x+y))
# 8.聚合rdd的所有元素并print输出
# print(rdd.collect())
# 9.停止 SparkContext
sc.stop()
# ********** End **********#
三、Spark RDD编程初级实践(scala)
基于上面的pyspark几个算子基本上已经理解了,在scala中算子的概念是一样的
3.1、数据去重import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object RemDup {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RemDup").setMaster("local")
val sc = new SparkContext(conf)
//输入文件fileA.txt和fileB.txt已保存在本地文件系统/root/step1_files目录中
val dataFile = "file:///root/step1_files"
val data = sc.textFile(dataFile, 2)
/********** Begin **********/
//第一步:执行过滤操作,把空行丢弃。
val rdd1 = data.filter(_.trim().length > 0)
//第二步:执行map操作,取出RDD中每个元素,去除尾部空格并生成一个(key, value)键值对。
val rdd2 = rdd1.map(line => (line.trim, ""))
//第三步:执行groupByKey操作,把所有key相同的value都组织成一个value-list。
val rdd3 = rdd2.groupByKey()
//第四步:对RDD进行重新分区,变成一个分区,
//在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
val rdd4 = rdd3.partitionBy(new HashPartitioner(1))
//第五步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
val rdd5 = rdd4.sortByKey()
//第六步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
val rdd6 = rdd5.keys
//第七步:执行collect操作,以数组的形式返回RDD中所有元素。
val rdd7 = rdd6.collect()
//第八步:执行foreach操作,并使用println打印出数组中每个元素的值。
println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
rdd7.foreach(println)
/********** End **********/
}
}
3.2、整合排序
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FileSort").setMaster("local")
val sc = new SparkContext(conf)
//输入文件file1.txt、file2.txt和file3.txt已保存在本地文件系统/root/step2_files目录中
val dataFile = "file:///root/step2_files"
val data = sc.textFile(dataFile, 3)
/********** Begin **********/
//第一步:执行过滤操作,把空行丢弃。
val rdd1 = data.filter(_.trim().length > 0)
//第二步:执行map操作,取出RDD中每个元素,去除尾部空格并转换成整数,生成一个(key, value)键值对。
val rdd2 = rdd1.map(line => (line.trim.toInt, ""))
//第三步:对RDD进行重新分区,变成一个分区,
//在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
val rdd3 = rdd2.partitionBy(new HashPartitioner(1))
//第四步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
val rdd4 = rdd3.sortByKey()
//第五步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
val rdd5 = rdd4.keys
//第六步:执行map操作,取出RDD中每个元素,生成一个(key, value)键值对,
//其中key是整数的排序位次,value是原待排序的整数。
var index = 0
val rdd6 = rdd5.map(t => {
index = index + 1
(index, t)
})
//第七步:执行collect操作,以数组的形式返回RDD中所有元素。
val rdd7 = rdd6.collect()
//第八步:执行foreach操作,依次遍历数组中每个元素,分别取出(key, value)键值对中key和value,
//按如下格式输出:key value
println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
rdd7.foreach(t => println(t._1 + " " + t._2))
/********** End **********/
}
}
3.3、求平均值
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object AvgScore {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FileSort").setMaster("local")
val sc = new SparkContext(conf)
//输入文件AlgorithmScore.txt、DatabaseScore.txt和PythonScore.txt已保存在本地文件系统/root/step3_files目录中
val dataFile = "file:///root/step3_files"
val data = sc.textFile(dataFile)
/********** Begin **********/
//第一步:执行过滤操作,把空行丢弃。
val rdd1 = data.filter(_.trim().length > 0)
//第二步:执行map操作,取出RDD中每个元素(即一行文本),以空格作为分隔符将一行文本拆分成两个字符串,
//拆分后得到的字符串封装在一个数组对象中,成为新的RDD中一个元素。
var rdd2 = rdd1.map(line => line.split(" "))
//第三步:执行map操作,取出RDD中每个元素(即字符串数组),取字符串数组中第一个元素去除尾部空格,
//取字符串数组中第二个元素去除尾部空格并转换成整数,并由这两部分构建一个(key, value)键值对。
val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt))
//第四步:执行mapValues操作,取出键值对RDD中每个元素的value,使用x=>(x,1)这个匿名函数进行转换。
val rdd4 = rdd3.mapValues(x => (x, 1))
//第五步:执行reduceByKey操作,计算出每个学生所有课程的总分数和总课程门数。
val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
//第六步:执行mapValues操作,计算出每个学生的平均成绩。
val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2))
//第七步:执行collect操作,以数组的形式返回RDD中所有元素。
val rdd7 = rdd6.collect()
//第八步:执行foreach操作,按如下格式打印出每个学生的平均成绩:姓名 成绩,其中成绩要求保留两位小数。
println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
rdd7.foreach(t => {
val x = t._2
println(t._1 + " " + f"$x%1.2f")
})
/********** End **********/
}
}
最后,感谢阅读,如有帮助,一键三连哈。



