栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RDD编程

RDD编程

文章目录
  • RDD基础
    • 创建rdd
    • rdd基本操作
  • 向spark传递函数
    • python
    • Scala
    • java

RDD基础

rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。

创建rdd
  1. 读取外部数据集
    lines = sc.textFile(“README.md”)
  2. 分发驱动器程序中的对象集合(如list、set)
rdd基本操作
  1. 转化操作transformation
    转化操作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
    ① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
    nums = sc.parallelize([1,2,3,4])
    squared = nums.map(lambda x: x*x).collect()
    ② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
    pythonlines = lines.filter(lambda line: “Python” in line)
    ③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
    lines = sc.parallelize([”hello world“,“hi”])
    words = lines.flatMap(lambda line: line.split(" "))
    words.first() #返回“hello”
  2. 行动操作action
    行动操作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
    ①reduce:接收两个同类型元素并将计算结果返回
    sum = rdd.reduce(lambda x,y: x+y)
    ② pythonlines.first()
    ③ pythonlines.collect() #取全部值
    ④ pythonlines.count() #计数
  3. 持久化
    Spark rdd惰性求值,每次调用行动操作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
    result = nums.map(lambda x: x*x)
    result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
    print(result.count())
    print(result.first())
    result.unpersist()
向spark传递函数 python
  1. lambda函数
    word = rdd.filter(lambda s : “error” in s)
  2. 定义局部函数
    注意:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:
    rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量
    query=self.query()
    rdd.filter(lambda x: query in x)
Scala java
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/303825.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号