一、什么是Spark rdd算子?二、算子的分类
Transformation算子Action算子 三、常用的Transformation算子及使用方法
1.map算子2.flatMap算子3.mapValues算子4.filter算子5.foreach算子6.groupBy算子6.groupByKey算子7.sortBy算子8.glom算子9.partitionBy算子10.repartition算子11.reduceByKey算子12.intersection算子13.union算子14.join算子15.distinct算子16.mapPartitions算子 四、常用的Action算子及使用方法
1.collect算子2.countByKey算子3.count算子4.reduce算子5.fold算子6.foreachPartition算子7.take算子8.takeOrdered算子9.takeSample算子9.top算子10.first算子11.saveAsTextFile算子 五、关于rdd算子的常见问题汇总
1.Action算子注意点2.groupByKey和reduceByKey的区别
以下算子使用代码均为pySpark,Spark版本3.2
一、什么是Spark rdd算子?算子:分布式对象上的API称之为算子
方法函数:本地对象的API,叫做方法函数
算子:分布式对象的API,叫做算子
rdd算子分为两类
Transformation:转换算子Action:动作(行动)算子 Transformation算子
定义:RDD的算子,返回值仍旧是一个RDD的,称之为转换算子。
特征:这类算子是lazy 懒加载的,如果没有Action算子,Transformation算子是不工作的。
定义:返回值不是rdd的就是action算子。
三、常用的Transformation算子及使用方法 1.map算子对于这两类算子来说,Transformation算子,相当于在构建执行计划,action是一个指令让这个执行计划开始工作。
如果没有action,Transformation算子之间的迭代关系,就是一个没有通电的流水线,只有action到来,这个数据处理流水线才开始工作。
功能:map算子,是将rdd的数据一条一条处理(处理逻辑基于map算子中接受的数据函数),返回新的rdd。语法:
2.flatMap算子
功能:对rdd执行map操作,然后进行解嵌套操作接触嵌套:
嵌套的list:lst = [[1, 2, 3],[4, 5, 6],[7, 8, 9]]
如果解除了嵌套:lst = [1, 2, 3, 4, 5, 6, 7, 8, 9]演示代码:
rdd = sc.parallelize(["a, b, c", "a, c, e", "e, c, a"])
# 按照空格切分数据后解除嵌套
print(rdd.flatMap(lambda x: x.split(' ')).collect)
# 打印结果
[a, b, c, a, c, e, e, c, a]
3.mapValues算子
功能:针对二元元组rdd,对其内部的二元元组的value执行map操作。语法:
rdd.mapValues(func) # func : (V) -> U # 注意:传入的参数,是二元元组的value值。 # 我们这个传入的方法‘func’,只针对value进行处理。代码示例:
rdd = sc.parallelize([('a', 1), ('a', 11), ('a', 6), ('b', 3), ('b', 5)])
# rdd.map(lambda x: (x[0], x[1] * 10))
# 将二元元组的所有value都诚意10进行处理
print(rdd.mapValues(lambda x: x * 10).collect())
# 打印结果
[('a', 10), ('a', 110), ('a', 60), ('b', 30), ('b', 50)]
4.filter算子
功能:过滤想要的数据进行保留语法:
rdd.filter(func) # func : (T) -> bool : 传入一个参数进来随意类型,返回值必须是True or False
代码示例:注意:这里返回值为True的值会被保留,False的数据会被过滤。
rdd = sc.parallelize([1, 2, 3, 4, 5]) # 保留奇数 print(rdd.filter(lambda x: x % 2 == 1).collect) # 打印结果 [2, 4]5.foreach算子
功能:对rdd的每一个元素,执行你提供的逻辑的操作(和map的作用是一个意思),但是这个rdd是没有返回值的。用法:
rdd.forearh(func) # func: (T) -> None代码示例:
6.groupBy算子
功能:将rdd的数据进行分组语法:
rdd.groupBy(func) # func 函数 # func: (T) -> k # 函数要求传入一个参数,返回一个返回值,类型与所谓。 # 这个函数是 拿到你的返回值后,将所有相同返回值的放入一个组中 # 分组完成后,每一个组是一个二元组,key就是返回值,所有同组的数据放入一个迭代器中作为value。代码示例:
sc.parallelize([1, 2, 3, 4, 5])
# 分组,将数字分层,偶数和奇数两个组
rdd2 = rdd.groupBy(lambda num: 'even' if (num % 2 == 0) else 'odd' )
# 将rdd2的元素的value值转为list,这样print可以输出内容。
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
# 打印结果
[('even, [2, 4]'), ('odd', [1, 3, 5])]
6.groupByKey算子
功能:根据KV型rdd,自动按照key分组语法:rdd.groupByKey()代码示例:
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
groupByKey_rdd = rdd.groupByKey()
print(groupByKey_rdd.map(lambda x: (x[0], list(x[1]))).collect())
# 打印结果
[('b', [1, 1, 1]), ('a', [1, 1])]
7.sortBy算子
功能:将rdd数据进行排序,基于你指定的排序依据。语法:
rdd.sortBy(func, ascending=False, numPartitoins=1) # func: (T) -> U :告知按照rdd中哪个数据进行排序,比如lambda x: x[1] 表示按照rdd中第二列元素进行排序。 # ascending=False:降序;ascending=True:升序。 # numPartitoins:用多少分区排序。代码示例:
8.glom算子
功能:将rdd的数据加上嵌套,这个嵌套按照分区来进行。
比如rdd数据[1, 2, 3, 4, 5]有两个分区,被glom后变成数据 [ [1, 2, 3],[4, 5] ]语法:rdd.glom()
9.partitionBy算子
功能:对rdd进行自定义分区操作。语法:
10.repartition算子
功能:对rdd的分区进行重新分区。用法:
rdd.repartitioin(N) # N: 重新分区数代码
# repartition 重新分区(数量) rdd2 = rdd.repartition(5) print(rdd2.glom().collect()) rdd3 = rdd.repartition(1) print(rdd3.glom(),collect())注意事项
11.reduceByKey算子对分区的数量操作一定要谨慎
一般情况下,我们写Spark代码,除了要求全局排序要设置为一个partition以外,多数时候,所有代码中关于修改分区的API我们尽量不要修改。
如果修改分区,会影响并行计算的管道数量。
功能:针对KV型rdd,自动按照key分组,然后按照你提供的聚合逻辑,完成==组内数据(value)==的聚合操作。
用法:
rdd.reduceByKey(func) # func: (V, V) -> V # 接受两个传入参数(两个参数类型要一致),返回一个返回值,类型和传入要求一致。
代码示例:
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
result_rdd = rdd.reduceByKey(lambad a, b: a+b)
print(result_rdd.collect())
# 打印结果
[('b', 3),('a', 2)]
注意:
reduceBykey中传入的函数只负责聚合,不会负责分组的操作,分组一律按自动 by key分组。
reduceByKey原理
reduceByKey中每一个分组聚合的逻辑如下图所示
功能:求两个rdd的交集,返回一个新的rdd。用法:rdd.intersection(其他rdd)代码示例:
rdd1 = sc.parallelize([('a',1), ('b',1)])
rdd2 = sc.parallelize([('a',1), ('c',1)])
union_rdd = rdd1.intersection(rdd2)
print(union_rdd.collect())
# 打印结果
[('a', 1)]
13.union算子
功能:两个rdd合并成一个rdd返回用法:rdd.union(其他rdd)代码示例:
rdd1 = sc.parallelize([1, 2, 3, 4]) rdd2 = sc.parallelize(['a', 'b', 'c']) union_rdd = rdd1.union(rdd2) print(union_rdd.collect()) # 打印结果 [1, 2, 3, 4, 'a', 'b', 'c']注意:
14.join算子1.union操作只会合并,不会去重。
2.union不同类型的rdd也是可以的。
3.要区分开SQL中的union,Spark中的union是合并两个rdd中的内容。
功能:对两个rdd进行join操作(可以实现SQL的内外连接),注意:join算子只能用于二元元组。用法:
rdd.join(其他rdd) # 内连接 rdd.leftOuterJoin(其他rdd) # 左外连接 rdd.rigthOuterJoin(其他rdd) # 右外连接代码示例:
# 部门id和员工姓名
rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")])
# 部门id和部门名称
rdd2 = sc.parallelize([(1001, "sales"), (1002, "tech")])
join_rdd = rdd1.join(rdd2).collect()
# 结果
(1001, ("zhangsan", "sales"))
(1002, ("lisi", "tech"))
15.distinct算子
功能:对rdd的数据进行去重,返回新的rdd。用法:
rdd.distinct(参数) # 参数:去重分区数量,一般不用传。16.mapPartitions算子
功能:与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
如图所示,mappartition一次被传递的是一整个分区的数据。作为一个迭代器(一整个list)传递过来。
四、常用的Action算子及使用方法
1.collect算子
功能:将rdd各个分区内的数据,统一收集到Driver中,形成一个list对象。用法:rdd.collect()注意事项:
2.countByKey算子1.这个算子,是将rdd各个分区的数据拉去到Driver中
2.要注意的是,rdd是分布式对象,数据量一般都很大,所以在使用这个算子之前一定要知道数据量有多大,,不然将大量数据拉回Driver会把Driver内存撑爆。
功能:统计key出现的次数(一般用于KV型rdd)代码示例
rdd1 = sc.textFile("../data.words.txt")
rdd2 = rdd1.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.map(lambda x: (x, 1))
# result 不是rdd
result = rdd3.countByKey()
print(result)
3.count算子
功能:统计rdd有多少条数据,返回的是一个数字用法:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6]).count() print(rdd) # 打印结果 64.reduce算子
功能:对rdd数据集,按照你传入的逻辑进行聚合。语法
rdd.reduce(func) # func: (T, T) -> T # 2个参数传入,1个返回值,返回值和参数要求类型一致代码示例:
rdd = sc.parallelize(range(1, 10)) # 将rdd的数据进行累加求和 print(rdd.reduce(lambda a, b: a + b)) # 打印结果 55执行流程图如下
5.fold算子
功能: 和reduce是一样的,都是接收传入逻辑进行聚合。但是fold的聚合带有初始值。
这个初始值会作用在:
- 分区内聚合分区间聚合
rdd.fold(参数, func) # 参数:聚合时用的初始值 # func: (V, V) -> V 接受两个传入参数(两个参数类型要一致),返回一个返回值,类型和传入要求一致。代码示例:
rdd = sc.parallelize(range(1, 10), 3) print(rdd.fold(10, lambda a, b: a + b)) # 打印结果 85代码解析:
6.foreachPartition算子range函数是所闭右开的,所以数据格式是[[1, 2, 3], [4, 5, 6], [7, 8, 9]],数据分布在三个分区。分区一:1 2 3聚合的时候带上初始值(初始值:10)得到结果:16
分区二:4 5 6聚合的时候带上初始值(初始值:10)得到结果:25
分区三:7 8 9聚合的时候带上初始值(初始值:10)得到结果:34
三个分区做聚合的时候也会加初始值(初始值:10)得到结果:10 + 16 + 25 + 34 = 85
功能:和普通的foreach一致,一次梳理的是一整个分区的数据。就像一个没有返回值的mapPartitons算子。代码示例:
sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
def ride10(data):
print("------------------------------")
result = list()
for i in data:
result.append(i * 10) # 将打他的数据读取出来,×10后插入list中。
print(result)
rdd.foreachPartition(ride10)
# 打印结果:
------------------------------
[10, 30]
------------------------------
[20, 40]
------------------------------
[70, 90, 60]
7.take算子
取rdd的前N个元素,组成list返回代码示例:
sc.parallelize([3, 2, 1, 4, 5, 6]).take(5) # 打印结果 [3, 2, 1, 4, 5]8.takeOrdered算子
功能:对rdd进行排序取前N个用法:
代码示例:rdd.takeOrdered(参数1, 参数2)
- 参数1:要几个数据
- 参数2:对排序的数据进行更改(不会更改排序的本身,只是在排序的时候更改排序的规则)
这个方法默认通过自然序升序排序,如果想用其他的排序规则,需要用参数2来对排序的数据进行处理。
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1) print(rdd.takeOrdered(3)) # 若想倒叙排序,可以将原本的正数变成负数 print(rdd.takeOrdered(3, lambda x: -x)) # 打印结果 [1, 2, 3] [9, 7, 6]9.takeSample算子
功能:随机抽样rdd数据用法
takeSample(参数1: True or False,参数2:采样数,参数3:随机数种子)
参数一:True表示允许取同一个数据,false表示不允许取同一个数据。和数据内容无关,是否重复表示的是同一位置的数据。
参数二:抽样要几个
参数三:随机数种子,这个参数传入一个数字即可,随意给
随机数种子的数据可以随便传,如果传同一个数字,那么取出的结果就是一致的。
一般参数三我们不传,Spark会自动给予随机数的种子。
代码示例:
sc.parallelize([1, 1, 1, 1, 1], 1) print(rdd.takeSample(True, 8))9.top算子
功能:对rdd数据集进行降序排序,取前N个代码示例:
sc.parallelize([3, 2, 1, 4, 5, 6]).top(3) #top 3表示降序取前三个 # 打印结果 [6, 5, 4]10.first算子
功能:取出rdd的第一个元素代码示例:
sc.parallelize([3, 2, 1]).first() # 打印结果 311.saveAsTextFile算子
功能:将rdd的数据写入文本中,支持本地写出,hdfs等文件系统。代码示例:
sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
rdd.saveAsTextFile("hdfs://node1:8080/output/1111")
五、关于rdd算子的常见问题汇总
1.Action算子注意点
我们常用的action算子中:
foreachsaveAsTextFile
这两个算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。
反之:
其余的Action算子都会将结果发送给Driver。
1、功能上的区别
groupByKey仅仅有分组功能而已reduceByKey不仅有groupByKey的分组功能,还有reduce的聚合功能,所以是分组+聚合一体化的算子。
2.、如果使用这两个算子分别进行分组+聚合操作,在性能上,这两个算子差别会很大。
reduceByKey的性能是远大于groupByKey + 聚合逻辑的



