- RDD基础
- 创建rdd
- rdd基本操作
- 向spark传递函数
- python
- Scala
- java
rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。
创建rdd- 读取外部数据集
lines = sc.textFile(“README.md”) - 分发驱动器程序中的对象集合(如list、set)
- 转化操作transformation
转化操作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x*x).collect()
② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
pythonlines = lines.filter(lambda line: “Python” in line)
③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
lines = sc.parallelize([”hello world“,“hi”])
words = lines.flatMap(lambda line: line.split(" "))
words.first() #返回“hello” - 行动操作action
行动操作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
①reduce:接收两个同类型元素并将计算结果返回
sum = rdd.reduce(lambda x,y: x+y)
② pythonlines.first()
③ pythonlines.collect() #取全部值
④ pythonlines.count() #计数 - 持久化
Spark rdd惰性求值,每次调用行动操作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
result = nums.map(lambda x: x*x)
result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
print(result.count())
print(result.first())
result.unpersist()
- lambda函数
word = rdd.filter(lambda s : “error” in s) - 定义局部函数
注意:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:
rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量
query=self.query()
rdd.filter(lambda x: query in x)



