栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RDD常用算子整理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RDD常用算子整理

相关算子整理

1.Action2.Transformation3.key-valueRDD的操作4.缓存操作5. 共享变量

from pyspark import SparkConf,SparkContext
import findspark 
findspark.init()
conf=SparkConf().setAppName('RD').setMaster('local[*]')
sc=SparkContext(conf=conf)
1.Action
collect将数据汇集到Driver,数据过大时有超内存风险
take将前若干个数据汇集到Driver,相比collect安全
takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
first、count取第一个数和统计数量
reduce利用二元函数对数据进行规约
rdd.reduce(lambda x,y:x+y)
foreachaccum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
accum.value
countByKey按key统计个数
saveAsTextFile保存rdd成text文件到本地
重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
2.Transformation
map对每个元素进行映射
rdd.map(lambda x:x**2).collect()
filter根据条件过滤数据
rdd.filter(lambda x:x>5)
flatMap将每个元素生成一个Array后压平
多用于词频统计结合split
sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
distinct去重
subtract找到属于前一个rdd而不属于后一个rdd的元素
a.subtract(b)
union并集
intersection交集
cartesian笛卡尔积
sortBy排序
zip按照拉链的方式连接两个RDD,类似python的zip函数
要求两个RDD具有相同的分区,每个分区元素数量相同
zipWithIndex从0开始的递增序列按照拉链方式连接
3.key-valueRDD的操作

PairRDD是指数据长度为2的tuple,类型的数据

reduceByKey对相同的key对应的value应用于二元聚合操作
rdd.reduceByKey(lambda x,y:x+y).collect()
groupByKey分组操作,得到iterator
sortByKey按照key排序,可以指定是否降序
rdd.sortByKey(False).collect()
join相当于根据key进行内连接
leftOuterJoin和rightOuterJoin类似于表的左外和右外连接
cogroup对两个输入分别goupByKey然后再对结果进行groupByKey
subtractByKey去除x中那些key也在y中的元素
x.subtractByKey(y).collect()
foldByKey操作和reduceByKey类似,但是要提供一个初始值
x.foldByKey(1,lambda x,y:x*y).collect()
4.缓存操作

当一个rdd被多个任务用作中间变量时,对其进行cache缓存到内存中,加快计算。
声明对一个rdd进行cache后,该rdd不会被立即缓存,而是等到它第一次被计算出来时才进行缓存。
可以使用persist明确指定存储级别,常用的存储级别是MEMORY_ONLY和EMORY_AND_DISK。
unpersist释放缓存,unpersist是立即执行的
如果要切断血缘关系,可以用checkpoint设置检查点将某个rdd保存到磁盘中。

声明对一个rdd进行checkpoint后,该rdd不会被立即保存到磁盘,而是等到它第一次被计算出来时才保存成检查点

cachecache缓存到内存中,使用存储级别 MEMORY_ONLY
MEMORY_ONLY意味着如果内存存储不下,放弃存储其余部分,需要时重新计算。
persist缓存到内存或磁盘中,默认使用存储级别MEMORY_AND_DISK
MEMORY_AND_DISK意味着如果内存存储不下,其余部分存储到磁盘中
persist可以指定其它存储级别,cache相当于persist(MEMORY_ONLY)
a.unpersist() #立即释放缓存
checkpoint将数据设置成检查点,写入到磁盘中
unpersist释放内存
#checkpoint 将数据设置成检查点,写入到磁盘中。
sc.setCheckpointDir("./data/checkpoint/")
rdd_students = sc.parallelize(["LiLei","Hanmeimei","LiLy","Ann"],2)

rdd_students_idx = rdd_students.zipWithIndex() 

#设置检查点后,可以避免重复计算,不会因为zipWithIndex重复计算触发不一致的问题
rdd_students_idx.checkpoint() 
rdd_students_idx.take(3)
5. 共享变量

​ 当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。但是,有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量,广播变量和累加器。
broadcast

广播变量是不可变变量,实现在不同节点不同任务之间共享数据。广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。

accumulator

累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的,在节点上不可见

广播变量

累加器

6.分区操作

分区操作一般是改变分区操作和转换操作

算子说明
glom将一个分区内的数据转换为一个列表作为一行
coalesceshuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现
repartition按随机数进行shuffle,相同key不一定在同一个分区
partitionBy按key进行shuffle,相同key放入同一个分区
HashPartitioner默认分区器,根据key的hash值进行分区,
相同的key进入同一分区,效率较高,key不可为Array.
RangePartitioner只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。
TaskContext获取当前分区id方法
TaskContext.get.partitionId
mapPartitions次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支
mapPartitionsWithIndex类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)
foreachPartition类似foreach,但每次提供一个Partition的一批数据
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762815.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号