栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

scala RDD行动算子

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

scala RDD行动算子

文章目录
  • RDD行动算子
    • reduce && collect && count && first && take && takeOrdered
    • aggregate
    • wordcount利用前面所学的八种方法
    • save
    • foreach

RDD行动算子

行动算子就是触发作业(Job)执行的方法
底层的代码调用的是环境对象的runJob方法
底层代码会创建ActiveJob并提交执行

reduce && collect && count && first && take && takeOrdered

reduce会按照参数内的规则对数据操作,最后直接返回结果,collect会按照分区顺序采集数据到Driver端内存中,形成数组

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object rdd_action_reduce_collect_count_first_take_takeOrdered {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().set("spark.testing.memory", "2147480000").setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val reduceRDD: Int = rdd.reduce(_ + _)
    println(reduceRDD)

    //按照分区顺序采集数据到Driver端内存中,形成数组
    val collectRDD: Array[Int] = rdd.collect()
    println(collectRDD.mkString(","))

    //获取数据源中数据个数
    val countRDD: Long = rdd.count()
    println(countRDD)

    //获取数据源中第一个数据
    val firstRDD: Int = rdd.first()
    println(firstRDD)

    //获取n个数据
    val takeRDD: Array[Int] = rdd.take(3)
    println(takeRDD.mkString(","))

    //获取排序后的n个数据
    val rdd1: RDD[Int] = sc.makeRDD(List(4,2,3,1))
    val takeOrderedRDD: Array[Int] = rdd.takeOrdered(3)
    println(takeOrderedRDD.mkString(","))
    sc.stop()
  }
}


aggregate

aggregate:初始值除了会参与分区内的计算,还会参与分区间的计算
aggregateByKey:初始值只会参与分区内的计算

可以从代码中直接看出来

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)),2
    val aggreRDD: Int = rdd.aggregate(10)(_ + _, _ + _)

在对第一个分区时,会进行:10+1=>11+2=>13
在对第二个分区时,会进行:10+3=>13+4=>17
对于分区间时,会进行:10+13=>23+17=>40

wordcount利用前面所学的八种方法
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object rdd_action_wc8种方法 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().set("spark.testing.memory", "2147480000").setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)


    wordcount1(sc)
    sc.stop()
  }

  //groupBy
  def wordcount1(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordgroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
    val wordcount: RDD[(String, Int)] = wordgroup.mapValues(_.size)
    wordcount.collect().foreach(println)
  }

  //groupByKey
  def wordcount2(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val newwords = words.map((_, 1))
    val group: RDD[(String, Iterable[Int])] = newwords.groupByKey()
    val wordcount: RDD[(String, Int)] = group.mapValues(_.size)
    wordcount.collect().foreach(println)
  }

  //reduceByKey
  def wordcount3(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val newwords = words.map((_, 1))
    val wordcount: RDD[(String, Int)] = newwords.reduceByKey(_+_)
    wordcount.collect().foreach(println)
  }

  //aggregateByKey
  def wordcount4(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val newwords = words.map((_, 1))
    val wordcount: RDD[(String, Int)] = newwords.aggregateByKey(0)(_+_,_+_)
    wordcount.collect().foreach(println)
  }

  //foldByKey
  def wordcount5(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val newwords = words.map((_, 1))
    val wordcount: RDD[(String, Int)] = newwords.foldByKey(0)(_+_)
    wordcount.collect().foreach(println)
  }

  //combineByKey
  def wordcount6(sc: SparkContext) ={
    val rdd= sc.makeRDD(List(
      ("hello spark"),("hello scala"),("hello java")
    ))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val newwords = words.map((_, 1))
    val wordcount: RDD[(String, Int)] = newwords.combineByKey(
      v=>v,
      (x:Int,y)=>(x+y),
      (x:Int,y:Int)=>(x+y)
    )
    wordcount.collect().foreach(println)

    //countByKey
    def wordcount7(sc: SparkContext) ={
      val rdd= sc.makeRDD(List(
        ("hello spark"),("hello scala"),("hello java")
      ))
      val words: RDD[String] = rdd.flatMap(_.split(" "))
      val newwords = words.map((_, 1))
      val wordcount: collection.Map[String, Long] = newwords.countByKey()
      println(wordcount.mkString(","))
    }

    //countByValue
    def wordcount8(sc: SparkContext) ={
      val rdd= sc.makeRDD(List(
        ("hello spark"),("hello scala"),("hello java")
      ))
      val words: RDD[String] = rdd.flatMap(_.split(" "))
      val wordcount: collection.Map[String, Long] = words.countByValue()
      println(wordcount.mkString(","))
    }
  }
}

save

其实是用来保存数据的,就是将数据保存在不同格式的文件中:

   rdd.saveAsTextFile()//一般情况下使用的
   rdd.saveAsObjectFile()
   rdd.saveAsSequenceFile() //这种保存方式要求数据必须是K-V类型的
foreach

foreach有两种不同的方式:

    rdd.foreach(println)
    println("///")
    rdd.collect().foreach(println)

rdd.foreach(println)是由于在生产过程中,我们的数据是会传输给不同的executor去执行一些操作最后读取,所以没有存在顺序

rdd.collect().foreach(println)这里的foreach其实是Driver端内存集合的循环遍历方法,所以是有顺序的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/292444.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号