栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkRDD操作之aggregate

SparkRDD操作之aggregate

SparkRDD操作之aggregate

看代码最好解释

package cn.sparkdemo.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}


object RDDAggregate {
  def main(args: Array[String]): Unit = {
    //1、创建上下文对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("Wordcount")
      .setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    //2、加载并行化集合手动指定分区数是2,得到RDD,
    val inputRDD: RDD[Int] = sc.parallelize((1 to 10), 2)
    //3、用aggregate计算元素的和。
    //inputRDD.aggregate(0)(()=>{},()=>{})此处参数零为聚合的初始值,第一个()=>{}为分区内的聚合,第二个()=>{}为分区间的聚合
    val aggregated_value: Int = inputRDD.aggregate(0)(
   	//分区内聚合,tmp为每次聚合后的值作为缓冲变量参与下次聚合,item为分区内的每个具体的值
      (tmp, item) => {
        println(s"分区内,分区号=${TaskContext.getPartitionId()}, tmp=${tmp},item=${item},sum=${tmp + item}")
        tmp + item
      },
      //分区间聚合,item为上面每个分区聚合后的分区内总值
      (tmp, item) => {
        println(s"分区间,分区号=${TaskContext.getPartitionId()}, tmp=${tmp},item=${item},sum=${tmp + item}")
        tmp + item
      }
    )
    //上述可以简化为
    inputRDD.aggregate(0)( _+_, _+_)
    //4、打印结果
    println(aggregated_value)
    //5、如果上述分区内核分区间的聚合函数逻辑一样,则可以简化成一个,成为了fold
    val fold_value: Int = inputRDD.fold(0)(_+_)
    println("fold_value="+fold_value)
    //6、如果上述初始值,没有太大的意义,则可以简化成reduce
    val reduce_value: Int = inputRDD.reduce(_+_)
    println("reduce_value="+reduce_value)
    sc.stop()
  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/633074.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号