栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Spark RDD 行动算子

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark RDD 行动算子

1.reduce

函数签名 def reduce(f: (T, T) => T): T

代码:

object ActionDemo {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15))
   println(rdd.reduce(_+_))
  }
}
 2.collect

函数签名 def collect(): Array[T]

代码: 

object ActionDemo2{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15))
    rdd.collect().foreach(println)
  }
}
 3.count

函数签名 def count(): Long

代码: 

object ActionDemo2{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15))
    val rdd2 =rdd.count()
    println(rdd2)
  }
}
 4.first

函数签名 def first(): T

返回 RDD 中的第一个元素

代码: 

object ActionDemo3{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15))
    val rdd2 =rdd.first()
    println(rdd2)
  }
}
5.take

函数签名 def take(num: Int): Array[T]

代码:

object ActionDemo4{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15))
    val rdd2 =rdd.take(2)
    rdd2.foreach(println)
  }
}
 6.takeOrdered

函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

代码:

object ActionDemo5{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    val rdd2 =rdd.takeOrdered(2)
    rdd2.foreach(println)
  }
}
 7.aggregate

函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

代码:

object ActionDemo6{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    //22 23 25 9 19 110
    //空分区也有默认值
    val rdd2 =rdd.aggregate(10)(_+_,_+_)
    println(rdd2)//318
  }
}
 8.fold

函数签名 def fold(zeroValue: T)(op: (T, T) => T): T

代码:

object ActionDemo7{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    //22 23 25 9 19 110
    //空分区也有默认值
    val rdd2 =rdd.fold(10)(_+_)
    println(rdd2)//318
  }
}
 9.countByKey

函数签名 def countByKey(): Map[K, Long]

代码: 

object ActionDemo8 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
    val rdd2 = rdd.countByKey()
    println(rdd2)

  }
}
 10.save 相关算子

函数签名

def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit

def saveAsSequenceFile( path: String,

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")
 11.foreach

函数签名

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

分布式遍历 RDD 中的每一个元素,调用指定函数

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/989523.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号