栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Transformation转换算子

Transformation转换算子

文章目录

Transformation转换算子1、单Value类型

map算子mapPartitions算子mapPartitionsWithIndex算子flatMap算子glom算子groupBy算子filter算子sample算子distinct算子coalesce算子repartition算子sortBy算子 2、双Value类型算子

交并差算子zip算子 3、Key-Value类型算子

partitionBy算子reduceByKey算子groupByKey算子aggregateByKey算子foldByKey算子combineByKey算子SortByKey算子MapValues算子join算子cogroup算子

Transformation转换算子 1、单Value类型 map算子

RDD中的每一个元素依次通过map算子中的匿名函数,形成一个新的RDD

分区不变

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test01_Map {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRdd = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    listRdd.map(i => {
      println("调用算子")
      i*2
    }).collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
mapPartitions算子

mapPartitions()以分区为单位执行Map,一次性处理一个分区的数据。

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test02_MapPartitons {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val listRdd = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    val value = listRdd.mapPartitions( list => {
      println("调用算子")
      list.map( i => {
        println("计算数字")
        i * 2
      } )
    } )

    value.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

map():每次处理一条数据

mapPartitions():一次处理一个分区的数据。这个分区的数据处理完后,原来RDD中分区的数据才会释放,可能造成OOM

当内存空间较大时,建议使用mapPartitions,效率更高

mapPartitionsWithIndex算子

类似于mapPartitions(),带分区号。

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test03_MapPartitionsWithIndex {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val listRdd = sc.makeRDD( 1 to 4, 2 )

    val value = listRdd.mapPartitionsWithIndex( (index, list) => {
      list.map( (index, _) )
    } )

    value.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
flatMap算子

扁平化,与map算子类似。但是输入一个元素,则输出一个迭代器

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test04_FlatMap {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD = sc.makeRDD( List( List( 1, 2, 3 ), List( 4, 5, 6 ) ), 2 )

    val intRDD = listRDD.flatMap( list => list )

    intRDD.collect().foreach(println)

    //判断分区
    //flatMap不改变分区的情况  保持原分区
    intRDD.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)


    // 对应 (长字符串 ,次数)  => (单词,次数),(单词,次数)
    val tupleRDD: RDD[(String, Int)] = sc.makeRDD(List(("hello world", 100), ("hello scala", 200)))

    tupleRDD.flatMap(tuple=>{
      tuple._1.split( " " )
        .map((_,tuple._2))
    })

    //偏函数
    tupleRDD.flatMap(tuple => tuple match {
      case (line,count) => line.split(" ").map(word => (word,count))
    })

    tupleRDD.flatMap{
      case (line,count) => line.split(" ").map(word => (word,count))
    }

    //4.关闭sc
    sc.stop()
  }
}
glom算子

分区转换数组:该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test05_Glom {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    val arrayRDD: RDD[Array[Int]] = listRDD.glom()

    val result = arrayRDD.map( _.max )

    result.collect().foreach(println)

    val lineRDD = sc.textFile( "input/1.sql" ,1)

    val value = lineRDD.glom()

    value.map(array => array.mkString).collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

可从sql文件中处理出sql语句

groupBy算子

分组,安装传入数组的返回值进行分组。将相同的key对应的值放入同一个迭代器中

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test06_GroupBy {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    val groupRDD = listRDD.groupBy( num => num % 2 )

    groupRDD.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

使用GroupBy进行wordcount

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test07_GroupByWC {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val lineRDD = sc.textFile( "input/2.txt" )

    val wordRDD = lineRDD.flatMap( _.split( " " ) )

    val tupleRDD: RDD[(String, Iterable[String])] = wordRDD.groupBy( word => word )

    tupleRDD.collect().foreach(println)

    val result1 = tupleRDD.mapValues( _.size )



    tupleRDD.map(tuple => (tuple._1,tuple._2.size)).collect().foreach(println)

    println("===========")
    result1.collect().foreach(println)

    println("============")

    //偏函数
    tupleRDD.map(tuple => tuple match {
      case (word,list) => (word,list.size)
    }).collect().foreach(println)

    tupleRDD.map{
      case (word,list) => (word,list.size)
    }.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
filter算子

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

不走shuffle

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test08_Filter {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext( conf )

    //3.编写代码
    val listRDD = sc.makeRDD( List( 1, 3, 2, 4 ), 2 )

    val value = listRDD.filter( _ % 2 == 0 )

    value.collect().foreach( println )

    listRDD.filter( _ % 2 == 0 ).mapPartitionsWithIndex( (num, list) => list.map( (num, _) ) )
      .collect()
      .foreach( println )


    //4.关闭sc
    sc.stop()
  }
}
sample算子

采样算子:

无放回有放回

object value09_sample {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建一个RDD
        val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        // 抽取数据不放回(伯努利算法)
        // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
        // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
        // 第一个参数:抽取的数据是否放回,false:不放回
        // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
        // 第三个参数:随机数种子
        val sampleRDD: RDD[Int] = dataRDD.sample(false, 0.5)
        sampleRDD.collect().foreach(println)

        println("----------------------")

        // 抽取数据放回(泊松算法)
        // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
        // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
        // 第三个参数:随机数种子
        val sampleRDD1: RDD[Int] = dataRDD.sample(true, 2)
        sampleRDD1.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}
distinct算子

对内部元素去重

用分布式的方式去重比HashSet集合方式相比效率高且不容易OOM

map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

其本质使用了reduceByKey,因此需要走shuffle

package com.hpu.value

import org.apache.spark.{SparkConf, SparkContext}


object Test10_Dis {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext( conf )

    //3.编写代码

    val intRDD = sc.makeRDD( List( 1, 2, 3, 4, 1, 2 ), 2 )

    val value = intRDD.distinct()

    value.mapPartitionsWithIndex( (num, list) => list.map( (num, _) ) )
      .collect()
      .foreach( println )

    //4.关闭sc
    sc.stop()
  }
}

coalesce算子

合并分区

缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

默认不执行shuffle

增加分区数时,走shuffle

源码分析:直接合并分区

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test11_Coalesce {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 5)

    val result = listRDD.coalesce( 2 )

    result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    println("========================")

    val listRDD1 = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    val result1 = listRDD1.coalesce( 4, true )

    result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)
    //4.关闭sc
    sc.stop()
  }
}
repartition算子

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test12_Repartition {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)

    val value = listRDD.repartition( 4 )

    value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

coalesce和repartition区别

coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

repartition实际上是调用的coalesce,进行shuffle。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
   coalesce(numPartitions, shuffle = true)
 }
sortBy算子

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。

sortBy需要走shuffle

package com.hpu.value

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test13_SortBy {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(7, 8, 5, 2, 9, 1, 2, 3, 4), 2)
    // spark的排序能够实现全局有序
    // 保证0号分区的数据都小于等于1号分区的数据
    // sortBy需要走shuffle
    listRDD.sortBy(i => i).mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    println("======")
    val value: RDD[(Int, Int)] = sc.makeRDD( List( (2, 1), (1, 2), (1, 1), (2, 2) ) )
    val value1 = value.sortBy( _._1 )
    value1.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
2、双Value类型算子 交并差算子

// 求交集会打散重新分区 即需要走shuffle
// 默认采用交集中较多的分区

// 求并集
// 并集不走shuffle
// 只是把两个RDD的分区数据拿到一起 分区的个数等于两个RDD分区个数之和

// 求差集
// 需要重写分区 走shuffle 可以自己写分区数

package com.hpu.doublevalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test01_intersection {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)

    val listRDD1: RDD[Int] = sc.makeRDD(List(5, 6, 3, 4, 2, 1), 2)

    val result = listRDD.intersection( listRDD1 )

    result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    val result1 = listRDD.union( listRDD1 )
    result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    println("===")

    val result2 = listRDD.subtract( listRDD1 )

    result2.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)
    //4.关闭sc
    sc.stop()
  }
}
zip算子

将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

package com.hpu.doublevalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test02_Zip {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    val listRDD1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    val value: RDD[(Int, Int)] = listRDD.zip( listRDD1 )
    // 将相同分区对应位置的元素拉链到一起  成为一个2元组
    // zip只能操作两个rdd具有相同的分区个数和元素个数
    value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)
    //4.关闭sc
    sc.stop()
  }
}
3、Key-Value类型算子 partitionBy算子

将RDD[K,V]中的K按照指定Partitioner重新进行分区;

如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

package com.hpu.keyvalue

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}


object Test01_PartitionBy {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )

    val tupleRDD = listRDD.map( (_, 1) )

    val value = tupleRDD.partitionBy( new HashPartitioner( 3 ) )
    // 填写分区器  使用分区器对数据重新分区
    // 分区器只能对key进行操作
    value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

自定义分区

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

(1)numPartitions: Int:返回创建出来的分区数。

(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}


object Test02_Partitioner {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)

    val value: RDD[(Int, Int)] = listRDD.map((_, 1))

    value.partitionBy(new MyPartitioner(2))
        .mapPartitionsWithIndex((num,list)=>list.map((num,_)))
                .collect()
                .foreach(println)



    //4.关闭sc
    sc.stop()
  }

  class MyPartitioner(partitions:Int) extends Partitioner{
    override def numPartitions: Int = partitions
    // 获取分区号 => 根据元素的key值  判断分给哪个分区
    // spark 的分区器只能对key进行分区
    override def getPartition(key: Any): Int = {
      key match {
        case i:Int => i%2
        case _ => 0
      }
    }
  }
}
reduceByKey算子

该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test03_ReduceByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext( conf )

    //3.编写代码

    val listRDD = sc.makeRDD( List( 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4 ), 2 )
    val tupleRDD = listRDD.map( (_, 1) )

    tupleRDD.collect().foreach(println)
    // 使用集合常用函数进行归约
    // reduce使用第一个元素作为初始值
    //    val list = List(1, 1, 1)
    //    val i: Int = list.reduce((res, elem) => res - elem)
    //    println(i)

    // 默认使用之前的分区个数
    // 会自动创建一个hash分区器
    // 将相同key的一组元素数据   (1,1,1)
    // reduceByKey一样采用第一个元素作为初始值

    println("-=============")
    val value = tupleRDD.reduceByKey( _ - _ )

    value.collect().foreach(println)
    // 验证结果
    // 需要进行两次归约  一次分区内  一次分区间
    // 分区间的第一个元素取决于分区的编号   编号越小越靠前
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 1), ("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1), ("b", 1), ("a", 1),("c",1)), 2)

    val value2: RDD[(String, Int)] = value1.reduceByKey(_ - _)
    value2.collect().foreach(println)
    //4.关闭sc
    sc.stop()
  }
}
groupByKey算子

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test04_GroupByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext( conf )

    //3.编写代码
    val listRDD: RDD[Int] = sc.makeRDD( List( 1, 2, 3, 4, 1, 2, 3, 4 ), 2 )

    val tupleRDD: RDD[(Int, Int)] = listRDD.map( (_, 1) )

    val result = tupleRDD.groupBy( _._1 )
    result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)
    println("===================")
    val result1 = tupleRDD.groupByKey()

    // groupByKey算子
    // 只能用于二元组类型的RDD
    // 对比groupBy, 聚合之后的value值 是一个集合  里面的元素只包含当前元素的value值
    result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)


    //4.关闭sc
    sc.stop()
  }
}

reduceByKey和groupByKey区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预(局部)聚合)操作,返回结果是RDD[K,V]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。

aggregateByKey算子

按照K处理分区内和分区间逻辑

(1 ) zeroValue(初始值):给每一个分区中的每一种key一个初始值;

(2 ) seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value ;

(3)combOp(分区间):函数用于合并每个分区中的结果。

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test05_aggregateByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码

    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)

    // 分区内计算需要使用初始值
    // 每一个分区的每一个key都会有一个初始值进行累加

    val result: RDD[(String, Int)] = value1.aggregateByKey( 10 )( _ + _, _ + _ )

    result.collect().foreach(println)
    println("=======")

    // 求分区内的最大值  分区间进行累加
    val result1 = value1.aggregateByKey( 10 )( (res, elem) => math.max( res, elem ), _ + _ )
    result1.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}

foldByKey算子

分区内和分区间处理逻辑相同的aggregateByKey()

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test06_FoldByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)

    // foldByKey可以使用自定义的初始值
    // 在进行计算的时候  同样会有预聚合  分区内的结算逻辑和分区间一致
    // 只在内区内使用初始值

    val result = value1.foldByKey( 10 )( _ + _ )

    result.collect().foreach(println)

    println("====")

    val result1 = value1.foldByKey( 0 )( (res, elem) => math.max( res, elem ) )

    result1.collect().foreach(println)
    //4.关闭sc
    sc.stop()
  }
}
combineByKey算子
def combineByKey[C](
      createCombiner: V => C,
      mergevalue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)]
(1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergevalue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergevalue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

求平均数

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test07_CombineByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)

    // 对上面的元素进行归约  (单词,("product",21))
    val result = value1.combineByKey(
      i => ("product", i),
      (res: (String, Int), elem: Int) => (res._1, res._2 * elem),
      (res1: (String, Int), elem: (String, Int)) => (elem._1, res1._2 * elem._2)
    )
    result.collect().foreach(println)

    println("========================")

    // 创建一个pairRDD,根据key计算每种key的平均值。
    // (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
    val list: List[(String, Int)] = List(
      ("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))

    val tupleRDD: RDD[(String, Int)] = sc.makeRDD(list)

    val result1 = tupleRDD.combineByKey(
      // 将(a,88) => (a,(88,1)) 因为算子已经内部按照key聚合了  所以写的时候只写value
      i => (i, 1),
      // 分区内累加   将相同分区相同key的值合并  (88,1)和91  => (179,2)
      (res: (Int, Int), elem: Int) => (res._1 + elem, res._2 + 1),
      // 分区间累加   将不同分区相同key的二元组合并在一起  (179,2) 和 (95,1) => (274,3)
      (res: (Int, Int), elem: (Int, Int)) => (res._1 + elem._1, res._2 + elem._2)
    )
    result1.collect().foreach(println)

    result1.mapValues(tuple => tuple match {
      case (sum:Int,count:Int) => sum.toDouble/count
    }).collect().foreach(println)


    //4.关闭sc
    sc.stop()
  }
}

WordCount案例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")

    /// 创建Spark上下文对象
    val sc = new SparkContext(conf)

    // 读取文件种的数据
    val textRDD: RDD[String] = sc.textFile("input/2.txt",2)

    // 扁平化, 按照" "来切割
    val wordRDD: RDD[String] = textRDD.flatMap {
      case x => {
        x.split(" ")
      }
    }

    // 转换结构
    val mapRDD: RDD[(String, Int)] = wordRDD.map {
      case x => {
        (x, 1)
      }
    }
    val combinRDD3: RDD[(String, Int)] = mapRDD.combineByKey(
      i => i,
      (acc: Int, v:Int) => acc + v,
      (acc1: Int, acc2: Int) => (acc1 + acc2)
    )
    combinRDD3.collect().foreach(println)

  }
}

reduceByKey、foldByKey、aggregateByKey、combineByKey

SortByKey算子

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test08_SortByKey {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1 = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    // 默认使用range分区器
    // 固定使用二元组中的key进行排序   不会使用value
    val result = value1.sortByKey( false )


    val result1: RDD[(Int, String)] = value1.sortBy( _._1 )

    val result2: RDD[(Int, String)] = value1.map( {
      case (key, value) => (key, (key, value))
    } ).sortByKey().map( _._2 )

    result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
MapValues算子

针对于(K,V)形式的类型只对V进行操作

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test09_MapValues {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
    val result = value1.mapValues( _ * 2 )

    result.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
join算子

等同于sql里的内连接,关联上的要,关联不上的舍弃

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test10_Join {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 12), ("b", 21)), 4)

    val value2: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 11), ("b", 17), ("c", 31), ("d", 22)), 4)

    val result: RDD[(String, (Int, Int))] = value1.join( value2 )
    // 将相同key合并
    // join走shuffle   使用hash分区器
    // 尽量保证join之前key是不重复的  如果有重复  会造成最终结果也是重复的
    result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
            .collect()
            .foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
cogroup算子

类似于sql的全连接,但是在同一个RDD中对key聚合

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

package com.hpu.keyvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object Test11_Cogroup {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc = new SparkContext(conf)

    //3.编写代码
    val value1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)

    val value2: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 10), ("b", 7), ("a", 11), ("d", 21)), 4)

    val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = value1.cogroup( value2 )

    result.collect().foreach(println)

    //4.关闭sc
    sc.stop()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734805.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号