Transformation转换算子1、单Value类型
map算子mapPartitions算子mapPartitionsWithIndex算子flatMap算子glom算子groupBy算子filter算子sample算子distinct算子coalesce算子repartition算子sortBy算子 2、双Value类型算子
交并差算子zip算子 3、Key-Value类型算子
partitionBy算子reduceByKey算子groupByKey算子aggregateByKey算子foldByKey算子combineByKey算子SortByKey算子MapValues算子join算子cogroup算子
Transformation转换算子 1、单Value类型 map算子RDD中的每一个元素依次通过map算子中的匿名函数,形成一个新的RDD
分区不变
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test01_Map {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRdd = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
listRdd.map(i => {
println("调用算子")
i*2
}).collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
mapPartitions算子
mapPartitions()以分区为单位执行Map,一次性处理一个分区的数据。
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test02_MapPartitons {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRdd = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
val value = listRdd.mapPartitions( list => {
println("调用算子")
list.map( i => {
println("计算数字")
i * 2
} )
} )
value.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
mapPartitionsWithIndex算子map():每次处理一条数据
mapPartitions():一次处理一个分区的数据。这个分区的数据处理完后,原来RDD中分区的数据才会释放,可能造成OOM
当内存空间较大时,建议使用mapPartitions,效率更高
类似于mapPartitions(),带分区号。
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test03_MapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRdd = sc.makeRDD( 1 to 4, 2 )
val value = listRdd.mapPartitionsWithIndex( (index, list) => {
list.map( (index, _) )
} )
value.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
flatMap算子
扁平化,与map算子类似。但是输入一个元素,则输出一个迭代器
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test04_FlatMap {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD = sc.makeRDD( List( List( 1, 2, 3 ), List( 4, 5, 6 ) ), 2 )
val intRDD = listRDD.flatMap( list => list )
intRDD.collect().foreach(println)
//判断分区
//flatMap不改变分区的情况 保持原分区
intRDD.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
// 对应 (长字符串 ,次数) => (单词,次数),(单词,次数)
val tupleRDD: RDD[(String, Int)] = sc.makeRDD(List(("hello world", 100), ("hello scala", 200)))
tupleRDD.flatMap(tuple=>{
tuple._1.split( " " )
.map((_,tuple._2))
})
//偏函数
tupleRDD.flatMap(tuple => tuple match {
case (line,count) => line.split(" ").map(word => (word,count))
})
tupleRDD.flatMap{
case (line,count) => line.split(" ").map(word => (word,count))
}
//4.关闭sc
sc.stop()
}
}
glom算子
分区转换数组:该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test05_Glom {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
val arrayRDD: RDD[Array[Int]] = listRDD.glom()
val result = arrayRDD.map( _.max )
result.collect().foreach(println)
val lineRDD = sc.textFile( "input/1.sql" ,1)
val value = lineRDD.glom()
value.map(array => array.mkString).collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
groupBy算子可从sql文件中处理出sql语句
分组,安装传入数组的返回值进行分组。将相同的key对应的值放入同一个迭代器中
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test06_GroupBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
val groupRDD = listRDD.groupBy( num => num % 2 )
groupRDD.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
使用GroupBy进行wordcount
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test07_GroupByWC {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val lineRDD = sc.textFile( "input/2.txt" )
val wordRDD = lineRDD.flatMap( _.split( " " ) )
val tupleRDD: RDD[(String, Iterable[String])] = wordRDD.groupBy( word => word )
tupleRDD.collect().foreach(println)
val result1 = tupleRDD.mapValues( _.size )
tupleRDD.map(tuple => (tuple._1,tuple._2.size)).collect().foreach(println)
println("===========")
result1.collect().foreach(println)
println("============")
//偏函数
tupleRDD.map(tuple => tuple match {
case (word,list) => (word,list.size)
}).collect().foreach(println)
tupleRDD.map{
case (word,list) => (word,list.size)
}.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
filter算子
接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。
不走shuffle
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test08_Filter {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext( conf )
//3.编写代码
val listRDD = sc.makeRDD( List( 1, 3, 2, 4 ), 2 )
val value = listRDD.filter( _ % 2 == 0 )
value.collect().foreach( println )
listRDD.filter( _ % 2 == 0 ).mapPartitionsWithIndex( (num, list) => list.map( (num, _) ) )
.collect()
.foreach( println )
//4.关闭sc
sc.stop()
}
}
sample算子
采样算子:
无放回有放回
object value09_sample {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建一个RDD
val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val sampleRDD: RDD[Int] = dataRDD.sample(false, 0.5)
sampleRDD.collect().foreach(println)
println("----------------------")
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val sampleRDD1: RDD[Int] = dataRDD.sample(true, 2)
sampleRDD1.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
distinct算子
对内部元素去重
用分布式的方式去重比HashSet集合方式相比效率高且不容易OOM
map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
其本质使用了reduceByKey,因此需要走shuffle
package com.hpu.value
import org.apache.spark.{SparkConf, SparkContext}
object Test10_Dis {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext( conf )
//3.编写代码
val intRDD = sc.makeRDD( List( 1, 2, 3, 4, 1, 2 ), 2 )
val value = intRDD.distinct()
value.mapPartitionsWithIndex( (num, list) => list.map( (num, _) ) )
.collect()
.foreach( println )
//4.关闭sc
sc.stop()
}
}
coalesce算子
合并分区
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
默认不执行shuffle
增加分区数时,走shuffle
源码分析:直接合并分区
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test11_Coalesce {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 5)
val result = listRDD.coalesce( 2 )
result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
println("========================")
val listRDD1 = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
val result1 = listRDD1.coalesce( 4, true )
result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
repartition算子
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test12_Repartition {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
val value = listRDD.repartition( 4 )
value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
sortBy算子coalesce和repartition区别
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
repartition实际上是调用的coalesce,进行shuffle。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。
sortBy需要走shuffle
package com.hpu.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test13_SortBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(7, 8, 5, 2, 9, 1, 2, 3, 4), 2)
// spark的排序能够实现全局有序
// 保证0号分区的数据都小于等于1号分区的数据
// sortBy需要走shuffle
listRDD.sortBy(i => i).mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
println("======")
val value: RDD[(Int, Int)] = sc.makeRDD( List( (2, 1), (1, 2), (1, 1), (2, 2) ) )
val value1 = value.sortBy( _._1 )
value1.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
2、双Value类型算子
交并差算子
// 求交集会打散重新分区 即需要走shuffle
// 默认采用交集中较多的分区
// 求并集
// 并集不走shuffle
// 只是把两个RDD的分区数据拿到一起 分区的个数等于两个RDD分区个数之和
// 求差集
// 需要重写分区 走shuffle 可以自己写分区数
package com.hpu.doublevalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test01_intersection {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
val listRDD1: RDD[Int] = sc.makeRDD(List(5, 6, 3, 4, 2, 1), 2)
val result = listRDD.intersection( listRDD1 )
result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
val result1 = listRDD.union( listRDD1 )
result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
println("===")
val result2 = listRDD.subtract( listRDD1 )
result2.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
zip算子
将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
package com.hpu.doublevalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test02_Zip {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val listRDD1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val value: RDD[(Int, Int)] = listRDD.zip( listRDD1 )
// 将相同分区对应位置的元素拉链到一起 成为一个2元组
// zip只能操作两个rdd具有相同的分区个数和元素个数
value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
3、Key-Value类型算子
partitionBy算子
将RDD[K,V]中的K按照指定Partitioner重新进行分区;
如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。
package com.hpu.keyvalue
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Test01_PartitionBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD = sc.makeRDD( List( 1, 2, 3, 4 ), 2 )
val tupleRDD = listRDD.map( (_, 1) )
val value = tupleRDD.partitionBy( new HashPartitioner( 3 ) )
// 填写分区器 使用分区器对数据重新分区
// 分区器只能对key进行操作
value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
自定义分区
要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Test02_Partitioner {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
val value: RDD[(Int, Int)] = listRDD.map((_, 1))
value.partitionBy(new MyPartitioner(2))
.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
class MyPartitioner(partitions:Int) extends Partitioner{
override def numPartitions: Int = partitions
// 获取分区号 => 根据元素的key值 判断分给哪个分区
// spark 的分区器只能对key进行分区
override def getPartition(key: Any): Int = {
key match {
case i:Int => i%2
case _ => 0
}
}
}
}
reduceByKey算子
该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test03_ReduceByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext( conf )
//3.编写代码
val listRDD = sc.makeRDD( List( 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4 ), 2 )
val tupleRDD = listRDD.map( (_, 1) )
tupleRDD.collect().foreach(println)
// 使用集合常用函数进行归约
// reduce使用第一个元素作为初始值
// val list = List(1, 1, 1)
// val i: Int = list.reduce((res, elem) => res - elem)
// println(i)
// 默认使用之前的分区个数
// 会自动创建一个hash分区器
// 将相同key的一组元素数据 (1,1,1)
// reduceByKey一样采用第一个元素作为初始值
println("-=============")
val value = tupleRDD.reduceByKey( _ - _ )
value.collect().foreach(println)
// 验证结果
// 需要进行两次归约 一次分区内 一次分区间
// 分区间的第一个元素取决于分区的编号 编号越小越靠前
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 1), ("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1), ("b", 1), ("a", 1),("c",1)), 2)
val value2: RDD[(String, Int)] = value1.reduceByKey(_ - _)
value2.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
groupByKey算子
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test04_GroupByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext( conf )
//3.编写代码
val listRDD: RDD[Int] = sc.makeRDD( List( 1, 2, 3, 4, 1, 2, 3, 4 ), 2 )
val tupleRDD: RDD[(Int, Int)] = listRDD.map( (_, 1) )
val result = tupleRDD.groupBy( _._1 )
result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
println("===================")
val result1 = tupleRDD.groupByKey()
// groupByKey算子
// 只能用于二元组类型的RDD
// 对比groupBy, 聚合之后的value值 是一个集合 里面的元素只包含当前元素的value值
result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
aggregateByKey算子reduceByKey和groupByKey区别
1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预(局部)聚合)操作,返回结果是RDD[K,V]。
2)groupByKey:按照key进行分组,直接进行shuffle。
3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。
按照K处理分区内和分区间逻辑
(1 ) zeroValue(初始值):给每一个分区中的每一种key一个初始值;
(2 ) seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value ;
(3)combOp(分区间):函数用于合并每个分区中的结果。
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test05_aggregateByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
// 分区内计算需要使用初始值
// 每一个分区的每一个key都会有一个初始值进行累加
val result: RDD[(String, Int)] = value1.aggregateByKey( 10 )( _ + _, _ + _ )
result.collect().foreach(println)
println("=======")
// 求分区内的最大值 分区间进行累加
val result1 = value1.aggregateByKey( 10 )( (res, elem) => math.max( res, elem ), _ + _ )
result1.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
foldByKey算子
分区内和分区间处理逻辑相同的aggregateByKey()
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test06_FoldByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
// foldByKey可以使用自定义的初始值
// 在进行计算的时候 同样会有预聚合 分区内的结算逻辑和分区间一致
// 只在内区内使用初始值
val result = value1.foldByKey( 10 )( _ + _ )
result.collect().foreach(println)
println("====")
val result1 = value1.foldByKey( 0 )( (res, elem) => math.max( res, elem ) )
result1.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
combineByKey算子
def combineByKey[C](
createCombiner: V => C,
mergevalue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
(1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergevalue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergevalue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
求平均数
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test07_CombineByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
// 对上面的元素进行归约 (单词,("product",21))
val result = value1.combineByKey(
i => ("product", i),
(res: (String, Int), elem: Int) => (res._1, res._2 * elem),
(res1: (String, Int), elem: (String, Int)) => (elem._1, res1._2 * elem._2)
)
result.collect().foreach(println)
println("========================")
// 创建一个pairRDD,根据key计算每种key的平均值。
// (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
val list: List[(String, Int)] = List(
("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val tupleRDD: RDD[(String, Int)] = sc.makeRDD(list)
val result1 = tupleRDD.combineByKey(
// 将(a,88) => (a,(88,1)) 因为算子已经内部按照key聚合了 所以写的时候只写value
i => (i, 1),
// 分区内累加 将相同分区相同key的值合并 (88,1)和91 => (179,2)
(res: (Int, Int), elem: Int) => (res._1 + elem, res._2 + 1),
// 分区间累加 将不同分区相同key的二元组合并在一起 (179,2) 和 (95,1) => (274,3)
(res: (Int, Int), elem: (Int, Int)) => (res._1 + elem._1, res._2 + elem._2)
)
result1.collect().foreach(println)
result1.mapValues(tuple => tuple match {
case (sum:Int,count:Int) => sum.toDouble/count
}).collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
WordCount案例
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
/// 创建Spark上下文对象
val sc = new SparkContext(conf)
// 读取文件种的数据
val textRDD: RDD[String] = sc.textFile("input/2.txt",2)
// 扁平化, 按照" "来切割
val wordRDD: RDD[String] = textRDD.flatMap {
case x => {
x.split(" ")
}
}
// 转换结构
val mapRDD: RDD[(String, Int)] = wordRDD.map {
case x => {
(x, 1)
}
}
val combinRDD3: RDD[(String, Int)] = mapRDD.combineByKey(
i => i,
(acc: Int, v:Int) => acc + v,
(acc1: Int, acc2: Int) => (acc1 + acc2)
)
combinRDD3.collect().foreach(println)
}
}
SortByKey算子reduceByKey、foldByKey、aggregateByKey、combineByKey
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test08_SortByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1 = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
// 默认使用range分区器
// 固定使用二元组中的key进行排序 不会使用value
val result = value1.sortByKey( false )
val result1: RDD[(Int, String)] = value1.sortBy( _._1 )
val result2: RDD[(Int, String)] = value1.map( {
case (key, value) => (key, (key, value))
} ).sortByKey().map( _._2 )
result1.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
MapValues算子
针对于(K,V)形式的类型只对V进行操作
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test09_MapValues {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
val result = value1.mapValues( _ * 2 )
result.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}
join算子
等同于sql里的内连接,关联上的要,关联不上的舍弃
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test10_Join {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 12), ("b", 21)), 4)
val value2: RDD[(String, Int)] = sc.makeRDD(
List(("a", 11), ("b", 17), ("c", 31), ("d", 22)), 4)
val result: RDD[(String, (Int, Int))] = value1.join( value2 )
// 将相同key合并
// join走shuffle 使用hash分区器
// 尽量保证join之前key是不重复的 如果有重复 会造成最终结果也是重复的
result.mapPartitionsWithIndex((num,list)=>list.map((num,_)))
.collect()
.foreach(println)
//4.关闭sc
sc.stop()
}
}
cogroup算子
类似于sql的全连接,但是在同一个RDD中对key聚合
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
package com.hpu.keyvalue
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test11_Cogroup {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName( "WordCount" ).setMaster( "local[*]" )
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.编写代码
val value1: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4)
val value2: RDD[(String, Int)] = sc.makeRDD(
List(("a", 10), ("b", 7), ("a", 11), ("d", 21)), 4)
val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = value1.cogroup( value2 )
result.collect().foreach(println)
//4.关闭sc
sc.stop()
}
}



