栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark常用算子逐一详解

Spark常用算子逐一详解

Spark常用算子逐一详解

一、什么是Spark rdd算子?二、算子的分类

Transformation算子Action算子 三、常用的Transformation算子及使用方法

1.map算子2.flatMap算子3.mapValues算子4.filter算子5.foreach算子6.groupBy算子6.groupByKey算子7.sortBy算子8.glom算子9.partitionBy算子10.repartition算子11.reduceByKey算子12.intersection算子13.union算子14.join算子15.distinct算子16.mapPartitions算子 四、常用的Action算子及使用方法

1.collect算子2.countByKey算子3.count算子4.reduce算子5.fold算子6.foreachPartition算子7.take算子8.takeOrdered算子9.takeSample算子9.top算子10.first算子11.saveAsTextFile算子 五、关于rdd算子的常见问题汇总

1.Action算子注意点2.groupByKey和reduceByKey的区别

以下算子使用代码均为pySpark,Spark版本3.2

一、什么是Spark rdd算子?

算子:分布式对象上的API称之为算子

方法函数:本地对象的API,叫做方法函数
算子:分布式对象的API,叫做算子

二、算子的分类

rdd算子分为两类

Transformation:转换算子Action:动作(行动)算子 Transformation算子

定义:RDD的算子,返回值仍旧是一个RDD的,称之为转换算子。
特征:这类算子是lazy 懒加载的,如果没有Action算子,Transformation算子是不工作的。

Action算子

定义:返回值不是rdd的就是action算子。

对于这两类算子来说,Transformation算子,相当于在构建执行计划,action是一个指令让这个执行计划开始工作。
如果没有action,Transformation算子之间的迭代关系,就是一个没有通电的流水线,只有action到来,这个数据处理流水线才开始工作。

三、常用的Transformation算子及使用方法 1.map算子

功能:map算子,是将rdd的数据一条一条处理(处理逻辑基于map算子中接受的数据函数),返回新的rdd。语法:
2.flatMap算子

功能:对rdd执行map操作,然后进行解嵌套操作接触嵌套:
嵌套的list:lst = [[1, 2, 3],[4, 5, 6],[7, 8, 9]]
如果解除了嵌套:lst = [1, 2, 3, 4, 5, 6, 7, 8, 9]演示代码:

rdd = sc.parallelize(["a, b, c", "a, c, e", "e, c, a"])
# 按照空格切分数据后解除嵌套
print(rdd.flatMap(lambda x: x.split(' ')).collect)

# 打印结果
[a, b, c, a, c, e, e, c, a]
3.mapValues算子

功能:针对二元元组rdd,对其内部的二元元组的value执行map操作。语法:

rdd.mapValues(func)
# func : (V) -> U
# 注意:传入的参数,是二元元组的value值。
# 我们这个传入的方法‘func’,只针对value进行处理。
代码示例:
rdd = sc.parallelize([('a', 1), ('a', 11), ('a', 6), ('b', 3), ('b', 5)])
# rdd.map(lambda x: (x[0], x[1] * 10))
# 将二元元组的所有value都诚意10进行处理
print(rdd.mapValues(lambda x: x * 10).collect())

# 打印结果
[('a', 10), ('a', 110), ('a', 60), ('b', 30), ('b', 50)]
4.filter算子

功能:过滤想要的数据进行保留语法:

rdd.filter(func)
# func : (T) -> bool : 传入一个参数进来随意类型,返回值必须是True or False

注意:这里返回值为True的值会被保留,False的数据会被过滤。

代码示例:
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 保留奇数
print(rdd.filter(lambda x: x % 2 == 1).collect)

# 打印结果
[2, 4]
5.foreach算子

功能:对rdd的每一个元素,执行你提供的逻辑的操作(和map的作用是一个意思),但是这个rdd是没有返回值的。用法:

rdd.forearh(func)
# func: (T) -> None
代码示例:
6.groupBy算子

功能:将rdd的数据进行分组语法:

rdd.groupBy(func)
# func 函数
# func: (T) -> k
# 函数要求传入一个参数,返回一个返回值,类型与所谓。
# 这个函数是 拿到你的返回值后,将所有相同返回值的放入一个组中
# 分组完成后,每一个组是一个二元组,key就是返回值,所有同组的数据放入一个迭代器中作为value。
代码示例:
sc.parallelize([1, 2, 3, 4, 5])
# 分组,将数字分层,偶数和奇数两个组
rdd2 = rdd.groupBy(lambda num: 'even' if (num % 2 == 0) else 'odd' )
# 将rdd2的元素的value值转为list,这样print可以输出内容。
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())

# 打印结果
[('even, [2, 4]'), ('odd', [1, 3, 5])]
6.groupByKey算子

功能:根据KV型rdd,自动按照key分组语法:rdd.groupByKey()代码示例:

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
groupByKey_rdd = rdd.groupByKey()
print(groupByKey_rdd.map(lambda x: (x[0], list(x[1]))).collect())

# 打印结果
[('b', [1, 1, 1]), ('a', [1, 1])]
7.sortBy算子

功能:将rdd数据进行排序,基于你指定的排序依据。语法:

rdd.sortBy(func, ascending=False, numPartitoins=1)
# func: (T) -> U :告知按照rdd中哪个数据进行排序,比如lambda x: x[1] 表示按照rdd中第二列元素进行排序。
# ascending=False:降序;ascending=True:升序。
# numPartitoins:用多少分区排序。
代码示例:
8.glom算子

功能:将rdd的数据加上嵌套,这个嵌套按照分区来进行。
比如rdd数据[1, 2, 3, 4, 5]有两个分区,被glom后变成数据 [ [1, 2, 3],[4, 5] ]语法:rdd.glom() 9.partitionBy算子

功能:对rdd进行自定义分区操作。语法:
10.repartition算子

功能:对rdd的分区进行重新分区。用法:

rdd.repartitioin(N)
# N: 重新分区数
代码
# repartition 重新分区(数量)
rdd2 = rdd.repartition(5)
print(rdd2.glom().collect())
rdd3 = rdd.repartition(1)
print(rdd3.glom(),collect())
注意事项

对分区的数量操作一定要谨慎
一般情况下,我们写Spark代码,除了要求全局排序要设置为一个partition以外,多数时候,所有代码中关于修改分区的API我们尽量不要修改。
如果修改分区,会影响并行计算的管道数量。

11.reduceByKey算子

功能:针对KV型rdd,自动按照key分组,然后按照你提供的聚合逻辑,完成==组内数据(value)==的聚合操作。

用法:

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接受两个传入参数(两个参数类型要一致),返回一个返回值,类型和传入要求一致。

代码示例:

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
result_rdd = rdd.reduceByKey(lambad a, b: a+b)
print(result_rdd.collect())

# 打印结果
[('b', 3),('a', 2)]

注意:

reduceBykey中传入的函数只负责聚合,不会负责分组的操作,分组一律按自动 by key分组。

reduceByKey原理
reduceByKey中每一个分组聚合的逻辑如下图所示

12.intersection算子

功能:求两个rdd的交集,返回一个新的rdd。用法:rdd.intersection(其他rdd)代码示例:

rdd1 = sc.parallelize([('a',1), ('b',1)])
rdd2 = sc.parallelize([('a',1), ('c',1)])
union_rdd = rdd1.intersection(rdd2)
print(union_rdd.collect())

# 打印结果
[('a', 1)]
13.union算子

功能:两个rdd合并成一个rdd返回用法:rdd.union(其他rdd)代码示例:

rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize(['a', 'b', 'c'])
union_rdd = rdd1.union(rdd2)
print(union_rdd.collect())

# 打印结果
[1, 2, 3, 4, 'a', 'b', 'c']
注意:

1.union操作只会合并,不会去重。
2.union不同类型的rdd也是可以的。
3.要区分开SQL中的union,Spark中的union是合并两个rdd中的内容。

14.join算子

功能:对两个rdd进行join操作(可以实现SQL的内外连接),注意:join算子只能用于二元元组。用法:

rdd.join(其他rdd) # 内连接
rdd.leftOuterJoin(其他rdd) # 左外连接
rdd.rigthOuterJoin(其他rdd) # 右外连接
代码示例:
# 部门id和员工姓名
rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")])
# 部门id和部门名称
rdd2 = sc.parallelize([(1001, "sales"), (1002, "tech")])
join_rdd = rdd1.join(rdd2).collect()

# 结果
(1001, ("zhangsan", "sales"))
(1002, ("lisi", "tech"))
15.distinct算子

功能:对rdd的数据进行去重,返回新的rdd。用法:

rdd.distinct(参数) 
# 参数:去重分区数量,一般不用传。
16.mapPartitions算子

功能:与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

如图所示,mappartition一次被传递的是一整个分区的数据。作为一个迭代器(一整个list)传递过来。 四、常用的Action算子及使用方法 1.collect算子

功能:将rdd各个分区内的数据,统一收集到Driver中,形成一个list对象。用法:rdd.collect()注意事项:

1.这个算子,是将rdd各个分区的数据拉去到Driver中
2.要注意的是,rdd是分布式对象,数据量一般都很大,所以在使用这个算子之前一定要知道数据量有多大,,不然将大量数据拉回Driver会把Driver内存撑爆。

2.countByKey算子

功能:统计key出现的次数(一般用于KV型rdd)代码示例

rdd1 = sc.textFile("../data.words.txt")
rdd2 = rdd1.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.map(lambda x: (x, 1))
# result 不是rdd
result = rdd3.countByKey()
print(result)
3.count算子

功能:统计rdd有多少条数据,返回的是一个数字用法:

rdd = sc.parallelize([1, 2, 3, 4, 5, 6]).count()
print(rdd)

# 打印结果
6
4.reduce算子

功能:对rdd数据集,按照你传入的逻辑进行聚合。语法

rdd.reduce(func)
# func: (T, T) -> T
# 2个参数传入,1个返回值,返回值和参数要求类型一致
代码示例:
rdd = sc.parallelize(range(1, 10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a, b: a + b))

# 打印结果
55
执行流程图如下
5.fold算子

功能: 和reduce是一样的,都是接收传入逻辑进行聚合。但是fold的聚合带有初始值。
这个初始值会作用在:

    分区内聚合分区间聚合
用法:
rdd.fold(参数, func)
# 参数:聚合时用的初始值
# func: (V, V) -> V  接受两个传入参数(两个参数类型要一致),返回一个返回值,类型和传入要求一致。
代码示例:
rdd = sc.parallelize(range(1, 10), 3)
print(rdd.fold(10, lambda a, b: a + b))

# 打印结果
85
代码解析:
    range函数是所闭右开的,所以数据格式是[[1, 2, 3], [4, 5, 6], [7, 8, 9]],数据分布在三个分区。分区一:1 2 3聚合的时候带上初始值(初始值:10)得到结果:16
    分区二:4 5 6聚合的时候带上初始值(初始值:10)得到结果:25
    分区三:7 8 9聚合的时候带上初始值(初始值:10)得到结果:34
    三个分区做聚合的时候也会加初始值(初始值:10)得到结果:10 + 16 + 25 + 34 = 85
6.foreachPartition算子

功能:和普通的foreach一致,一次梳理的是一整个分区的数据。就像一个没有返回值的mapPartitons算子。代码示例:

sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)

def ride10(data):
	print("------------------------------")
	result = list()
	for i in data:
		result.append(i * 10) # 将打他的数据读取出来,×10后插入list中。
	print(result)

rdd.foreachPartition(ride10)

# 打印结果:
------------------------------
[10, 30]
------------------------------
[20, 40]
------------------------------
[70, 90, 60]
7.take算子

取rdd的前N个元素,组成list返回代码示例:

sc.parallelize([3, 2, 1, 4, 5, 6]).take(5)
# 打印结果
[3, 2, 1, 4, 5]
8.takeOrdered算子

功能:对rdd进行排序取前N个用法:

rdd.takeOrdered(参数1, 参数2)
- 参数1:要几个数据
- 参数2:对排序的数据进行更改(不会更改排序的本身,只是在排序的时候更改排序的规则)
这个方法默认通过自然序升序排序,如果想用其他的排序规则,需要用参数2来对排序的数据进行处理。

代码示例:
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
print(rdd.takeOrdered(3))

# 若想倒叙排序,可以将原本的正数变成负数
print(rdd.takeOrdered(3, lambda x: -x))

# 打印结果
[1, 2, 3]
[9, 7, 6]
9.takeSample算子

功能:随机抽样rdd数据用法

takeSample(参数1: True or False,参数2:采样数,参数3:随机数种子)
参数一:True表示允许取同一个数据,false表示不允许取同一个数据。和数据内容无关,是否重复表示的是同一位置的数据。
参数二:抽样要几个
参数三:随机数种子,这个参数传入一个数字即可,随意给
随机数种子的数据可以随便传,如果传同一个数字,那么取出的结果就是一致的。
一般参数三我们不传,Spark会自动给予随机数的种子。

代码示例:

sc.parallelize([1, 1, 1, 1, 1], 1)
print(rdd.takeSample(True, 8))
9.top算子

功能:对rdd数据集进行降序排序,取前N个代码示例:

sc.parallelize([3, 2, 1, 4, 5, 6]).top(3) #top 3表示降序取前三个

# 打印结果
[6, 5, 4]
10.first算子

功能:取出rdd的第一个元素代码示例:

sc.parallelize([3, 2, 1]).first()
# 打印结果
3
11.saveAsTextFile算子

功能:将rdd的数据写入文本中,支持本地写出,hdfs等文件系统。代码示例:

sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
rdd.saveAsTextFile("hdfs://node1:8080/output/1111")
五、关于rdd算子的常见问题汇总 1.Action算子注意点

我们常用的action算子中:

foreachsaveAsTextFile

这两个算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。
反之:
其余的Action算子都会将结果发送给Driver。

2.groupByKey和reduceByKey的区别

1、功能上的区别

groupByKey仅仅有分组功能而已reduceByKey不仅有groupByKey的分组功能,还有reduce的聚合功能,所以是分组+聚合一体化的算子。

2.、如果使用这两个算子分别进行分组+聚合操作,在性能上,这两个算子差别会很大。
reduceByKey的性能是远大于groupByKey + 聚合逻辑的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700159.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号