看代码最好解释
package cn.sparkdemo.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object RDDAggregate {
def main(args: Array[String]): Unit = {
//1、创建上下文对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("Wordcount")
.setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2、加载并行化集合手动指定分区数是2,得到RDD,
val inputRDD: RDD[Int] = sc.parallelize((1 to 10), 2)
//3、用aggregate计算元素的和。
//inputRDD.aggregate(0)(()=>{},()=>{})此处参数零为聚合的初始值,第一个()=>{}为分区内的聚合,第二个()=>{}为分区间的聚合
val aggregated_value: Int = inputRDD.aggregate(0)(
//分区内聚合,tmp为每次聚合后的值作为缓冲变量参与下次聚合,item为分区内的每个具体的值
(tmp, item) => {
println(s"分区内,分区号=${TaskContext.getPartitionId()}, tmp=${tmp},item=${item},sum=${tmp + item}")
tmp + item
},
//分区间聚合,item为上面每个分区聚合后的分区内总值
(tmp, item) => {
println(s"分区间,分区号=${TaskContext.getPartitionId()}, tmp=${tmp},item=${item},sum=${tmp + item}")
tmp + item
}
)
//上述可以简化为
inputRDD.aggregate(0)( _+_, _+_)
//4、打印结果
println(aggregated_value)
//5、如果上述分区内核分区间的聚合函数逻辑一样,则可以简化成一个,成为了fold
val fold_value: Int = inputRDD.fold(0)(_+_)
println("fold_value="+fold_value)
//6、如果上述初始值,没有太大的意义,则可以简化成reduce
val reduce_value: Int = inputRDD.reduce(_+_)
println("reduce_value="+reduce_value)
sc.stop()
}
}



