栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

PySpark之Spark RDD Action函数

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

PySpark之Spark RDD Action函数

四、Action函数

不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立管理,而Action操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob方法向集群正式提交请求,所以每个Action操作对应一个Job

五、Transformer算子

一、值类型ValueType

map:map(func):将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中的map算子相当于初始化一个RDD,新RDD叫做MappedRDD

[root@node1 /]# /export/server/spark/bin/pyspark 
> --master spark://node1:7077 
> --executor-memory 1g 
> --total-executor-cores 2
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd1.map(lambda x:x+1).collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]

groupBy

>>> x = sc.parallelize([1,2,3])
>>> y = x.groupBy(lambda x:'A' if (x%2 == 1) else 'B')
>>> print(y.mapValues(list).collect())
[('A', [1, 3]), ('B', [2])]

Filter:filter(func):选出所有func返回值为true的元素,生成一个新的RDD返回

>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd2 = rdd1.map(lambda x:x*2)
>>> rdd3 = rdd2.filter(lambda x:x>4)
>>> rdd3.collect()
[6, 8, 10, 12, 14, 16, 18]

flatMap():flatMap会先执行map的操作,再将所有对象合并为一个对象

>>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
>>> rdd2 = rdd1.flatMap(lambda x:x.split(" "))
>>> rdd2.collect()
['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
二、双值类型DoublevalueType

Union:对两个RDD求并集

>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
[('a', 1), ('b', 2), ('c', 1), ('b', 3)]

intersection:对两个RDD求交集

>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd4 = rdd3.intersection(rdd2)
>>> rdd4.collect()
[('b', 3), ('c', 1)]

groupByKey:以元组中的第0个元素作为Key,进行分组,返回一个新的RDD

>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd4 = rdd3.groupByKey()
>>> rdd4.collect()
[('b', ), ('c', ), ('a', )]
>>> result = rdd4.collect()
>>> result[0]
('b', )
>>> result[0][1]

>>> list(result[0][1])
[3, 2]

reduceByKey:将key相同的键值对,按照Function进行计算

>>> rdd = sc.parallelize([("a",1),("b",1),("a",1)])
>>> rdd.reduceByKey(lambda x,y:x+y).collect()
[('b', 1), ('a', 2)]

sortByKey:根据key进行排序

>>> tmp = [('a',1),('b',2),('1',3),('d',4),('2',5)]
>>> sc.parallelize(tmp).sortByKey().first()
('1', 3)
>>> sc.parallelize(tmp).sortByKey(True,1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey(True,2).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> tmp2 = [('Mary',1),('had',2),('a',3),('little',4),('lamb',5)]
>>> tmp2.extend([('whose',6),('fleece',7),('was',8),('white',9)])
>>> sc.parallelize(tmp2).sortByKey(True,3,keyfunc=lambda k:k.lower()).collect()
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]

countByValue

>>> x = sc.parallelize([1,3,1,2,3])
>>> y = x.countByValue()
>>> x.collect()
[1, 3, 1, 2, 3]
>>> y
defaultdict(, {1: 2, 3: 2, 2: 1})
六、Action算子

collect:返回一个list,list中包含RDD中的所有元素,只有当数据量较小的时候使用Collect,因为所有的结果都会加载到内存中

>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd2 = rdd1.map(lambda x:x+1)
>>> rdd2
PythonRDD[100] at RDD at PythonRDD.scala:53
>>> rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]

reduce:将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止

>>> rdd1 = sc.parallelize([1,2,3,4,5])
>>> rdd1.reduce(lambda x,y:x+y)
15

first:返回RDD的第一个元素

>>> sc.parallelize([2,3,4]).first()
2

take:返回RDD的前N个元素

>>> sc.parallelize([2,3,4,5,6]).take(2)
[2, 3]
>>> sc.parallelize([2,3,4,5,6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(10),100).filter(lambda x:x>90).take(3)
[]
>>> sc.parallelize(range(100),100).filter(lambda x:x>90).take(3)
[91, 92, 93]
>>> sc.parallelize(range(90,100),100).filter(lambda x:x>90).take(3)
[91, 92, 93]
>>> sc.parallelize(range(90,100)).filter(lambda x:x>90).take(3)
[91, 92, 93]

top:排序取前几个,从大到小

>>> x = sc.parallelize([1,3,1,2,3])
>>> y = x.top(num=3)
>>> x.collect()
[1, 3, 1, 2, 3]
>>> y
[3, 3, 2]

count:返回RDD中元素的个数

>>> sc.parallelize([2,3,4]).count()
3

takeSample:抽样API

>>> rdd = sc.parallelize(range(0,10))
>>> rdd.takeSample(True,20,1)
[0, 6, 3, 4, 3, 1, 3, 7, 3, 5, 3, 0, 0, 9, 6, 5, 7, 9, 4, 7]
>>> rdd.takeSample(True,5,1)
[8, 8, 0, 3, 6]
>>> rdd.takeSample(True,5,1)
[8, 8, 0, 3, 6]
>>> rdd.takeSample(False,5,1)
[6, 8, 9, 7, 5]
>>> rdd.takeSample(False,5,2)
[5, 9, 3, 4, 6]
>>> rdd.takeSample(False,5,2)
[5, 9, 3, 4, 6]

foreach:仅返回满足foreach内函数条件元素。

def f(x):
    print(x)
sc.parallelize([1,2,3,4,5]).foreach(f)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/732620.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号