- RDD行动算子
- reduce && collect && count && first && take && takeOrdered
- aggregate
- wordcount利用前面所学的八种方法
- save
- foreach
行动算子就是触发作业(Job)执行的方法
底层的代码调用的是环境对象的runJob方法
底层代码会创建ActiveJob并提交执行
reduce会按照参数内的规则对数据操作,最后直接返回结果,collect会按照分区顺序采集数据到Driver端内存中,形成数组
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object rdd_action_reduce_collect_count_first_take_takeOrdered {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().set("spark.testing.memory", "2147480000").setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val reduceRDD: Int = rdd.reduce(_ + _)
println(reduceRDD)
//按照分区顺序采集数据到Driver端内存中,形成数组
val collectRDD: Array[Int] = rdd.collect()
println(collectRDD.mkString(","))
//获取数据源中数据个数
val countRDD: Long = rdd.count()
println(countRDD)
//获取数据源中第一个数据
val firstRDD: Int = rdd.first()
println(firstRDD)
//获取n个数据
val takeRDD: Array[Int] = rdd.take(3)
println(takeRDD.mkString(","))
//获取排序后的n个数据
val rdd1: RDD[Int] = sc.makeRDD(List(4,2,3,1))
val takeOrderedRDD: Array[Int] = rdd.takeOrdered(3)
println(takeOrderedRDD.mkString(","))
sc.stop()
}
}
aggregate
aggregate:初始值除了会参与分区内的计算,还会参与分区间的计算
aggregateByKey:初始值只会参与分区内的计算
可以从代码中直接看出来
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)),2
val aggreRDD: Int = rdd.aggregate(10)(_ + _, _ + _)
在对第一个分区时,会进行:10+1=>11+2=>13
在对第二个分区时,会进行:10+3=>13+4=>17
对于分区间时,会进行:10+13=>23+17=>40
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object rdd_action_wc8种方法 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().set("spark.testing.memory", "2147480000").setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
wordcount1(sc)
sc.stop()
}
//groupBy
def wordcount1(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordgroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
val wordcount: RDD[(String, Int)] = wordgroup.mapValues(_.size)
wordcount.collect().foreach(println)
}
//groupByKey
def wordcount2(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val group: RDD[(String, Iterable[Int])] = newwords.groupByKey()
val wordcount: RDD[(String, Int)] = group.mapValues(_.size)
wordcount.collect().foreach(println)
}
//reduceByKey
def wordcount3(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val wordcount: RDD[(String, Int)] = newwords.reduceByKey(_+_)
wordcount.collect().foreach(println)
}
//aggregateByKey
def wordcount4(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val wordcount: RDD[(String, Int)] = newwords.aggregateByKey(0)(_+_,_+_)
wordcount.collect().foreach(println)
}
//foldByKey
def wordcount5(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val wordcount: RDD[(String, Int)] = newwords.foldByKey(0)(_+_)
wordcount.collect().foreach(println)
}
//combineByKey
def wordcount6(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val wordcount: RDD[(String, Int)] = newwords.combineByKey(
v=>v,
(x:Int,y)=>(x+y),
(x:Int,y:Int)=>(x+y)
)
wordcount.collect().foreach(println)
//countByKey
def wordcount7(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val newwords = words.map((_, 1))
val wordcount: collection.Map[String, Long] = newwords.countByKey()
println(wordcount.mkString(","))
}
//countByValue
def wordcount8(sc: SparkContext) ={
val rdd= sc.makeRDD(List(
("hello spark"),("hello scala"),("hello java")
))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordcount: collection.Map[String, Long] = words.countByValue()
println(wordcount.mkString(","))
}
}
}
save
其实是用来保存数据的,就是将数据保存在不同格式的文件中:
rdd.saveAsTextFile()//一般情况下使用的 rdd.saveAsObjectFile() rdd.saveAsSequenceFile() //这种保存方式要求数据必须是K-V类型的foreach
foreach有两种不同的方式:
rdd.foreach(println)
println("///")
rdd.collect().foreach(println)
rdd.foreach(println)是由于在生产过程中,我们的数据是会传输给不同的executor去执行一些操作最后读取,所以没有存在顺序
rdd.collect().foreach(println)这里的foreach其实是Driver端内存集合的循环遍历方法,所以是有顺序的



