1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
RDD转换算子根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型
2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
2. Value型Transformation算子| map | mapPartitions | mapPartitionsWithIndex | flatMap |
| glom | groupBy | filter | sample |
| distinct | coalesce | repartition | sortBy |
面试题:请列举Spark的transformation算子(不少于8个),并简述功能(重点)
1)map (func):返回一个新的 RDD,该 RDD 由每一个输入元素经过func 函数转换后组成.
2)mapPartitions(func):类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的RDD上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而 mapPartitions 被调用M 次,一个函数一次处理所有分区。
3)reduceByKey(func,[numTask]):在一个(K,V)的 RDD 上调用,返回一个(K,V)的RDD,使用定的 reduce 函数,将相同key 的值聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。
4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv 对的 RDD 中,,按 key 将value 进行分组合并,合并时,将每个 value 和初始值作为 seq 函数的参数,进行计算,返回的结果作为一个新的 kv 对,然后再将结果按照 key 进行合并,最后将每个分组的 value 传递给 combine 函数进行计算(先将前两个value 进行计算,将返回结果和下一个 value 传给 combine 函数,以此类推),将 key 与计算结果作为一个新的 kv 对输出。
5)combineByKey(createCombiner: V=>C, mergevalue: (C, V) =>C, mergeCombiners: (C, C) =>C):
对相同 K,把 V 合并成一个集合。
1.createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作 createCombiner()的函数来创建那个键对应的累加器的初始值
2.mergevalue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergevalue()方法将该键的累加器对应的当前值与这个新的值进行合并
3.mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的mergeCombiners() 方法将各个分区的结果进行合并。
➢ 函数签名
def map[U: ClassTag](f: T => U): RDD[U]
➢ 函数说明
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
需求1:将数组中每个数乘以2
package com.meiyuan.bigdata.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCore")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
// 【1,2】,【3,4】
rdd.saveAsTextFile("output")
val mapRDD = rdd.map(_*2)
// 【2,4】,【6,8】
mapRDD.saveAsTextFile("output1")
mapRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① rdd的计算一个分区内的数据是一个一个执行逻辑,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
② 分区内数据的执行是有序的。不同分区数据计算是无序的(并行无法确定先后)
③ 一个partition一个task发给Executor不会跨区执行任务
④ 分区不变, 数据转换之后所在的分区位置也不变
❖ 小功能:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - map
val rdd = sc.textFile("data/apache.log")
// 数据格式:IP地址+用户ID+访问时间戳+时区+请求方式
// 这里面的String就是请求资源的路径
val mapRDD: RDD[String] = rdd.map(
line => {
val data = line.split(" ")
data(6) // 6表示文件位置6(从0开始数)
}
)
mapRDD.collect().foreach(println)
sc.stop()
}
}
2) mapPartitions
➢ 函数签名
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
需求:分区为2,下面数据一共操作几次?
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitions
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 一个分区数据全部拿到做操作;通过迭代器得到迭代器
// 内存中的操作, 类似于批处理, 1和2一起操作, 3和4一起操作; 2次
val mpRDD: RDD[Int] = rdd.mapPartitions(
iter => {
// 打印两次
println(">>>>>>>>>>")
iter.map(_ * 2)
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① 以分区为单位进行数据转换操作, 而不是一个一个操作
② 但是会将整个分区的数据加载到内存进行引用
③ 如果处理完的数据是不会被释放掉,存在对象的引用
④ 在内存较小,数据量较大的场合下,容易出现内存溢出。
❖ 小功能:获取每个数据分区的最大值
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitions
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 每个分区最大值【2】,【4】
val mpRDD = rdd.mapPartitions(
iter => {
// 迭代器得到迭代器,求迭代器最大值是一个值所以需要包起来变成迭代器
List(iter.max).iterator
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
面试题:map和mapPartitions的区别?
➢ 数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
➢ 功能的角度
Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
➢ 性能的角度
Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。
➢ 函数签名
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
需求1:求数字的分区
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitionsWithIndex
val rdd = sc.makeRDD(List(1,2,3,4))
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
// 求数字的分区(分区号码, 数字)(一共8个分区)
// 1, 2, 3, 4
//(1,1)(3,2),(5,3),(7,4)
iter.map(
num => {
(index, num)
}
)
}
)
mpiRDD.collect().foreach(println)
sc.stop()
}
}
❖ 小功能:获取第二个数据分区的数据
4) flatMap➢ 函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
➢ 函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
需求:拆词
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - flatMap
val rdd: RDD[String] = sc.makeRDD(List(
"Hello Scala", "Hello Spark"
))
// 一个字符串拆成一个一个单词
// 简约写法 val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))
val flatRDD: RDD[String] = rdd.flatMap(
s => {
s.split(" ")
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
❖ 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - flatMap
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val flatRDD = rdd.flatMap(
data => {
data match {
case list:List[_] => list
case dat => List(dat)
}
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
5) glom
➢ 函数签名
def glom(): RDD[Array[T]]
➢ 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
List => Int 整体拆成个体flatMap操作
Int => Array 相反的操作,个体变成整体,glom操作
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - glom
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
val glomRDD: RDD[Array[Int]] = rdd.glom()
// data是两个分区的两个数组, 所以需要循环
// 1,2
// 3,4
glomRDD.collect().foreach(data => println(data.mkString(",")))
sc.stop()
}
}
❖ 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - glom
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 【2】,【4】 分区内取最大值
// 【6】 分区之间求和
// 分区数据变成数组
val glomRDD: RDD[Array[Int]] = rdd.glom()
// 求每个分区最大值
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum) // 采集回来也是数组
sc.stop()
}
}
6) groupBy
➢ 函数签名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
➢ 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
// 相同的key值的数据会放置在一个组中
def groupFunction(num:Int) = {
num % 2
}
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
// (0,CompactBuffer(2, 4))
// (1,CompactBuffer(1, 3))
groupRDD.collect().foreach(println)
sc.stop()
}
}
❖ 小功能:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2)
// 分组和分区没有必然的关系
val groupRDD = rdd.groupBy(_.charAt(0))
// (h,CompactBuffer(hive, hbase))
// (H,CompactBuffer(Hello, Hadoop))
groupRDD.collect().foreach(println)
sc.stop()
}
}
❖ 小功能:从服务器日志数据apache.log中获取每个时间段访问量。
parse函数是把字符串转换成日期
format函数是把日期转换成字符串
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd = sc.textFile("data/apache.log")
// 从服务器日志数据中获取每个时间段的访问量
val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
line => {
val data = line.split(" ")
val time = data(3)
// 格式化 - 字符串格式需要统一
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date: Date = sdf.parse(time)
// 字符串格式需要统一
val sdf1 = new SimpleDateFormat("HH")
val hour: String = sdf1.format(date) // 时间段
(hour, 1) // 时间点出现了一次
}
).groupBy(_._1)
timeRDD.map{
case ( hour, iter ) => {
(hour, iter.size)
}
}.collect.foreach(println)
sc.stop()
}
}
❖ 小功能:WordCount (看part5)
7) filter➢ 函数签名
def filter(f: T => Boolean): RDD[T]
➢ 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
需求:奇数
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
filterRDD.collect().foreach(println)
sc.stop()
}
}
❖ 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.textFile("data/apache.log")
// 从服务器日志数据中获取2015年5月17日的请求路径
// 不用map是因为不是光返回time, 还要请求路径数据
rdd.filter(
line => {
val data = line.split(" ")
val time = data(3)
time.startsWith("17/05/2015")
}
).collect().foreach(println)
sc.stop()
}
}
8) sample
➢ 函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
➢ 函数说明
根据指定的规则从数据集中抽取数据
sample算子需要传递三个参数
1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
2. 第二个参数表示,
如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
3. 第三个参数表示,抽取数据时随机算法的种子
如果不传递第三个参数,那么使用的是当前系统时间
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
// 结果 1,5,5,6,6,6,6,7,7,7,8,8,9,9,9,9 随机的
println(rdd.sample(
true,
2
//1
).collect().mkString(","))
sc.stop()
}
}
9) distinct
➢ 函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
将数据集中重复的数据去重
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// reduceByKey聚合规则根据key聚合不管第二个,直接返回第一个
// (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
// (1, null)(1, null)(1, null)
// (null, null) => null
// (1, null) => 1
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
sc.stop()
}
}
总结:
scala底层是用HashSet去重,而RDD是用分布式处理方式去重
10) coalesce➢ 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
➢ 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)
// val newRDD: RDD[Int] = rdd.coalesce(2) // 默认不shuffle
val newRDD: RDD[Int] = rdd.coalesce(2,true)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
总结:
① coalesce方法默认情况下不会将分区的数据打乱重新组合,这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
② 如果想要让数据均衡,可以进行shuffle处理, 第二个参数为true => 数据打乱没有规律,不会是【1,2,3】【4,5,6】
➢ 函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
//val newRDD: RDD[Int] = rdd.coalesce(3, true)
// 扩大分区, 均匀分配数据变少, 并行计算能力增加
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
总结:
① coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。所以如果想要实现扩大分区的效果,需要使用shuffle操作
② spark提供了一个简化的操作
缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
面试题:Repartition和Coalesce 的关系与区别,能简单说说吗?
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
➢ 函数签名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
➢ 函数说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
需求: list排序
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - sortBy 根据制定规则排序, 默认分区数量不变
val rdd = sc.makeRDD(List(6,2,4,5,3,1), 2)
val newRDD: RDD[Int] = rdd.sortBy(num=>num)
// 分2区,数据从小到大排序【1,2,3】【4,5,6】
newRDD.saveAsTextFile("output")
sc.stop()
}
}
需求:tuple对按照第一个元素排序
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark12_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - sortBy
val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
// val newRDD = rdd.sortBy(t=>t._1) 按字符串排序11和2比较1在2前面
// 按数字排序,false是降序
val newRDD = rdd.sortBy(t=>t._1.toInt, false)
// (11,2)
// (2,3)
// (1,1)
newRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
② sortBy默认情况下,不会改变分区。但是中间存在shuffle操作
| intersection | union | subtract | zip |
➢ 函数签名
def intersection(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD和参数RDD求交集后返回一个新的RDD
➢ 函数签名
def union(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD和参数RDD求并集后返回一个新的RDD
➢ 函数签名
def subtract(other: RDD[T]): RDD[T]
➢ 函数说明
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集
➢ 函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
➢ 函数说明
将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
val rdd1 = sc.makeRDD(List(1,2,3,4),2)
val rdd2 = sc.makeRDD(List(3,4,5,6),2)
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
总结:
① 交集(intersection),并集(union)和差集(subtract)要求两个数据源数据类型保持一致
② 拉链(zip)操作两个数据源的类型可以不一致
③ Can't zip RDDs with unequal numbers of partitions: List(2, 4) - 两个数据源要求分区数量要保持一致
val rdd1 = sc.makeRDD(List(1,2,3,4),2)
val rdd2 = sc.makeRDD(List(3,4,5,6),4)
④ Can only zip RDDs with same number of elements in each partition - 两个数据源要求分区中数据数量保持一致
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
val rdd2 = sc.makeRDD(List(3,4,5,6),2)
| partitionBy | reduceByKey | groupByKey | aggregateByKey | foldByKey |
| combineByKey | sortByKey | join | leftOuterJoin | cogroup |
➢ 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
➢ 函数说明
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
partitionBy方法是 PairRDDFunctions提供的,这里从 RDD => PairRDDFunctions 利用开发OCP原则,通过隐式转换 (二次编译), 伴生对象有一个方法rddToPairRDDFunctions 把RDD变为PairRDDFunctions
区分partitionBy和coalesce和repartition:
① partitionBy根据指定的分区规则对数据进行重分区
② 区分coalesce和repartition是分区数量的改变
③ group by只对数据分组和分区无关
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(1,2,3,4),2)
// int => tuple 对偶元组形成键值可以使用
val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
// 当前就返回他自己, 所以这一步没有意义
newRDD.partitionBy(new HashPartitioner(2))
newRDD.saveAsTextFile("output")
sc.stop()
}
}
总结:
如果重分区的分区器和当前RDD的分区器一样怎么办? 在底层源码判断了当前的分区器和传入的分区器类型和分区数量。首先匹配分区的类型是否相同,如果相同判断分区数量是否相同, 如果类型和数量都相同,返回它自己。
Spark是否有其他分区器?RangePartitioner 排序使用
如果想要按自己的方法进行数据分区怎么办?可以自己写分区器, 改变数据存放的位置
➢ 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
➢ 函数说明
可以将数据按照相同的Key对Value进行聚合
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark15_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
println(s"x = ${x}, y = ${y}")
x + y
} )
// x = 1, y = 2
// x = 3, y = 3
// (a,6)
// (b,4)
reduceRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① reduceByKey中如果key的数据只有一个,是不会参与运算的。
② reduceByKey相同key的value两两聚合,reduceByKey支持分区内预聚合功能(落盘之前就聚合在一起),可以有效减少shuffle时落盘的数据量,提升shuffle的性能。
分区内 (同一个分区内数据做聚合),分区间 (落盘之后就是多个分区间做聚合),分区内和分区间计算规则是相同的。
③ 如果分区内和分区间计算规则不相同可以用aggregatedByKey,比如
【1,2】,【3,4】 => 分区内求最大值【2】,【4】 分区间求和【6】
❖ 小功能:WordCount
19) groupByKey➢ 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
➢ 函数说明
将数据源的数据根据key对value进行分组
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark16_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
// (a,CompactBuffer(1, 2, 3))
// (b,CompactBuffer(4))
groupRDD.collect().foreach(println)
// (groupBy(tuple._1)按tuple的第一个元素分组
val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
// (a,CompactBuffer((a,1), (a,2), (a,3)))
// (b,CompactBuffer((b,4)))
groupRDD1.collect().foreach(println)
sc.stop()
}
}
总结:
groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
元组中的第一个元素就是key,key是确定的, value单独拿出来
元组中的第二个元素就是相同key的value的集合
groupBy:按哪个条件分组不确定,不一定是通过key分组,并且没有单独把value拿出来,而是把整体进行分组。
❖ 小功能:WordCount
面试题:reduceByKey与groupByKey的区别,哪一种更具优势?
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,返回结果是RDD[k,v],这样会减少落盘的数据量,而groupByKey只是按照key进行分组,不存在数据量减少的问题,reduceByKey性能比较高。【spark中,shuffle操作必须落盘处理,shuffle操作的性能非常低(与磁盘交互)】
从功能的角度:reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。
20) aggregateByKey➢ 函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
➢ 函数说明
将数据根据不同的规则进行分区内计算和分区间计算
(初始值为0)
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
// math.min(x, y)
// math.max(x, y)
// 分区内取最大值
// (a,5) (a,1) => (a,5) (a,2) =>(a,5); (b,5) (b,3) => (b,5)
// (b,4) (b,5) => (b,5) (b,5) => (b,5); (a,5) (a,6) => (a,6)
// 分区间求和 (b,5) (b,5) => (b,10); (a,5) (a,6) => (a,11)
// (b,10)
// (a,11)
// (b,12)
// (a,9)
// (b,12)
// (a,9)
rdd.aggregateByKey(5)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect.foreach(println)
// 分区内和分区间也可以做相同规则的计算
// 分区内
// (a,3),(b,3)
// (b,9),(a,6)
// 分区间
// (b,3),(b,9) => (b,12)
// (a,3),(a,6) => (a,9)
rdd.aggregateByKey(0)(
(x, y) => x + y,
(x, y) => x + y
).collect.foreach(println)
// scala简化原则
rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)
sc.stop()
}
}
总结:
①aggregateByKey存在函数柯里化,有两个参数列表, 没有默认值
a. 第一个参数列表,需要传递一个参数,表示为初始值
主要用于当碰见第一个key的时候,和value进行分区内计算
b. 第二个参数列表需要传递2个参数
第一个参数表示分区内计算规则
第二个参数表示分区间计算规则
需求:取得不同分区当中相同key的的平均值
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark18_RDD_Operator_Transform3 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
// 取得不同分区当中相同key的的平均值
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
// aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
//val aggRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _)
//aggRDD.collect.foreach(println)
// 获取相同key的数据的平均值 => (a, 3),(b, 4)
// 第一个0是相同key计算的初始值,如a - 1,2,6, 第二个0是key出现的次数
// (0,0)表示相同key的初始值
// 返回的RDD[(K, U)] K就是key,即String, U就是tuple
val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )(
( t, v ) => {
(t._1 + v, t._2 + 1) // v代表数量,分区内(数据相加, 次数相加)
},
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2) // 分区间相加
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
// (b,4)
// (a,3)
resultRDD.collect().foreach(println)
sc.stop()
}
}
21) foldByKey
➢ 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
➢ 函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark17_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
//rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)
// 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
rdd.foldByKey(0)(_+_).collect.foreach(println)
sc.stop()
}
}
总结:
① aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
22) combineByKey➢ 函数签名
def combineByKey[C](
createCombiner: V => C,
mergevalue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
➢ 函数说明
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
// 之前的方法是有初始值, 所以就需要在tuple那个给数据类型,否则易报错
v => (v, 1),
// 分区内规则
( t:(Int, Int), v ) => {
(t._1 + v, t._2 + 1)
},
// 分区间规则
(t1:(Int, Int), t2:(Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① combineByKey : 方法需要三个参数
第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
第二个参数表示:分区内的计算规则
第三个参数表示:分区间的计算规则
② aggregateByKey设置了初始值,但是key的次数并没有包括设置初始值的数量,不太合理,所以使用combineByKey对第一个数据结构进行转换,从a,1到a,(1,1)
③ mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
面试题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
reduceByKey: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同AggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
➢ 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
➢ 函数说明
在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的
➢ 函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
➢ 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd1 = sc.makeRDD(List(
("a", 1), ("a", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 5), ("c", 6),("a", 4)
))
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
// (a,(1,5))
// (a,(1,4))
// (a,(2,5))
// (a,(2,4))
// (c,(3,6))
joinRDD.collect().foreach(println)
sc.stop()
}
}
总结:
① join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
② 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
③ 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
➢ 函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
➢ 函数说明
类似于SQL语句的左外连接
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark22_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)//, ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5),("c", 6)
))
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
// (a,(1,Some(4)))
// (b,(2,Some(5)))
leftJoinRDD.collect().foreach(println)
// val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
//(a,(Some(1),4))
// (b,(Some(2),5))
// (c,(None,6))
// rightJoinRDD.collect().foreach(println)
sc.stop()
}
}
26) cogroup
➢ 函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
➢ 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark23_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - (Key - Value类型)
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)//, ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5),("c", 6),("c", 7)
))
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
// (a,(CompactBuffer(1),CompactBuffer(4)))
// (b,(CompactBuffer(2),CompactBuffer(5)))
// (c,(CompactBuffer(),CompactBuffer(6, 7)))
cgRDD.collect().foreach(println)
sc.stop()
}
}
总结:
cogroup : connect + group (分组,连接) 同一个key分组在一起然后连接在一起
面试题:有哪些会引起Shuffle过程的Spark算子呢?
shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key拉取到同一个节点上,进行聚合或join等操作。
① 重分区算子:repartition、coalesce
② ByKey算子:groupByKey、reduceByKey、aggregateByKey、combineByKey、sortByKey、sortBy
③ join 算子: cogroup、join、leftOuterJoin、intersection、subtract、subtractByKey
④ 去重算子: distinct
4. Actions算子| reduce | collect | count | first | take |
| takeOrdered | aggregate | fold | countByKey | save 相关算子 |
| foreach |
所谓的行动算子,其实就是触发作业(Job)执行的方法,底层代码调用的是环境对象的runJob方法,底层代码中会创建ActiveJob,并提交执行。
请列举Spark的action算子(不少于6个),并简述功能(重点) 标黄色是必备
1) reduce➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// reduce 两两聚合1和2变为3, 3和3聚合为6,6和4聚合为10
val i: Int = rdd.reduce(_+_)
println(i) // 10
sc.stop()
}
}
2) collect
➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组Array的形式返回数据集的所有元素
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
val ints: Array[Int] = rdd.collect()
// 结果为1,2,3,4
println(ints.mkString(","))
sc.stop()
}
}
3) count
➢ 函数签名
def count(): Long
➢ 函数说明
返回RDD中元素的个数
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// count : 数据源中数据的个数
val cnt = rdd.count()
// 结果为4
println(cnt)
sc.stop()
}
}
4) first
➢ 函数签名
def first(): T
➢ 函数说明
返回RDD中的第一个元素
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// first : 获取数据源中数据的第一个
val first = rdd.first()
// 结果是1
println(first)
sc.stop()
}
}
5) take
➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由RDD的前n个元素组成的数组
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// take : 获取N个数据
val ints: Array[Int] = rdd.take(3)
// 1,2,3
println(ints.mkString(","))
sc.stop()
}
}
6) takeOrdered
➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该RDD排序后的前n个元素组成的数组
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// takeOrdered : 数据排序后,取N个数据, 默认升序排列, 降序把第二个参数传进去
val rdd1 = sc.makeRDD(List(4,2,3,1))
val ints1: Array[Int] = rdd1.takeOrdered(3)(Ordering[Int].reverse)
// 4,3,2
println(ints1.mkString(","))
sc.stop()
}
}
7) aggregate
➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且参与分区间计算
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.aggregate(10)(_+_, _+_)
//10 + 13 + 17 = 40
println(result)
sc.stop()
}
}
8) fold
➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate的简化版操作
分区内和分区间计算规则相同
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)
// 40
println(result)
sc.stop()
}
}
9) countByKey
➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种key的个数
需求:统计每个value出现的次数
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,1,1,4),2)
// TODO - 行动算子
// 统计每个value出现的次数
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
// Map(4 -> 1, 1 -> 3)
println(intToLong)
sc.stop()
}
}
需求:统计每个key出现的次数
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1),("a", 2),("a", 3)
))
// 统计每个key出现的次数和value没有关系
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
// Map(a -> 3)
println(stringToLong)
sc.stop()
}
}
10) save 相关算子
➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中
package com.meiyuan.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//val rdd = sc.makeRDD(List(1,1,1,4),2)
val rdd = sc.makeRDD(List(
("a", 1),("a", 2),("a", 3)
))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
11) foreach
➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历RDD中的每一个元素,调用指定函数
总结:
foreach外边的代码在Driver端执行
foreach内的代码在Executor执行
foreach用于换行打印
5. 不同算子实现的不同写法的WordCountpackage com.meiyuan.bigdata.spark.core.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark03_WordCount {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
wordcount1(sc)
wordcount2(sc)
wordcount3(sc)
wordcount4(sc)
wordcount5(sc)
wordcount6(sc)
wordcount7(sc)
wordcount8(sc)
wordcount9(sc)
sc.stop()
}
// groupBy
def wordcount1(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)
val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
wordCount.collect().foreach(println)
println("wordcount1========================================")
}
// // groupByKey (shuffle,数据量大性能不高)
def wordcount2(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1)) // 要求数据有key和value类型
val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
wordCount.collect().foreach(println)
println("wordcount2========================================")
}
// reduceByKey
def wordcount3(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1))
val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
wordCount.collect().foreach(println)
println("wordcount3========================================")
}
// aggregateByKey
def wordcount4(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1))
val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)
wordCount.collect().foreach(println)
println("wordcount4========================================")
}
// foldByKey
def wordcount5(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1))
val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)
wordCount.collect().foreach(println)
println("wordcount5========================================")
}
// combineByKey
def wordcount6(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1))
val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
v=>v,
(x:Int, y) => x + y,
(x:Int, y:Int) => x + y
)
wordCount.collect().foreach(println)
println("wordcount6========================================")
}
// countByKey
def wordcount7(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_,1))
val wordCount: collection.Map[String, Long] = wordOne.countByKey()
wordCount.foreach(println)
println("wordcount7========================================")
}
// countByValue
def wordcount8(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordCount: collection.Map[String, Long] = words.countByValue()
wordCount.foreach(println)
println("wordcount8========================================")
}
// reduce, aggregate, fold (scala)
def wordcount9(sc : SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
// 【(word, count),(word, count)】
// word => Map[(word,1)]
val mapWord = words.map(
word => {
mutable.Map[String, Long]((word,1))
}
)
// 把两个map合并在一起, 需要相同key将value做聚合, 不同key做累加
// map2这里是k-v对, 用foreach取出来k-v对, map1进行聚合
val wordCount = mapWord.reduce(
(map1, map2) => {
map2.foreach{
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
map1
}
)
println(wordCount)
println("wordcount9========================================")
}
}
6. 需求分析
需求 1:统计出每一个省份每个广告被点击数量排行的Top3
1) 数据准备
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
2) 需求分析如下:
1. 获取原始数据:时间戳,省份,城市,用户,广告
2. 将原始数据进行结构的转换。方便统计
时间戳,省份,城市,用户,广告 => ( ( 省份,广告 ), 1 )
3. 将转换结构后的数据,进行分组聚合
( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
4. 将聚合的结果进行结构的转换
( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) 模式匹配
5. 将转换结构后的数据根据省份进行分组
( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
6. 将分组后的数据组内排序(降序),取前3名
7. 采集数据打印在控制台
3) 功能实现
package com.meiyuan.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark24_RDD_Req {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 案例实操
// 统计每一个省份每个广告被点击数量排行的Top3
// 1. 获取原始数据:时间戳,省份,城市,用户,广告
val dataRDD = sc.textFile("data/agent.log")
// 2. 将原始数据进行结构的转换。方便统计
// 时间戳,省份,城市,用户,广告
// =>
// ( ( 省份,广告 ), 1 )
val mapRDD = dataRDD.map(
line => {
val data = line.split(" ")
(( data(1), data(4) ), 1)
}
)
// 3. 将转换结构后的数据,进行分组聚合
// ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)
// 4. 将聚合的结果进行结构的转换
// ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) 模式匹配
val newMapRDD = reduceRDD.map{
case ( (prv, ad), sum ) => {
(prv, (ad, sum))
}
}
// 5. 将转换结构后的数据根据省份进行分组
// ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
// 6. 将分组后的数据组内排序(降序),取前3名
val resultRDD = groupRDD.mapValues( // key保持不变只对value操作进行数据转换
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
} // 可迭代的集合不能排序, ._2代表迭代器中tuple的第二个元素
)
// 7. 采集数据打印在控制台
resultRDD.collect().foreach(println)
sc.stop()
}
}
需求2~4数据
// 用户访问动作表 case class UserVisitAction( date: String,// 用户点击行为的日期 user_id: Long,// 用户的 ID session_id: String,//Session 的 ID page_id: Long,// 某个页面的 ID action_time: String,// 动作的时间点 search_keyword: String,// 用户搜索的关键词 click_category_id: Long,// 某一个商品品类的 ID click_product_id: Long,// 某一个商品的 ID order_category_ids: String,// 一次订单中所有品类的 ID 集合 order_product_ids: String,// 一次订单中所有商品的 ID 集合 pay_category_ids: String,// 一次支付中所有品类的 ID 集合 pay_product_ids: String,// 一次支付中所有商品的 ID 集合 city_id: Long ) // 城市 id
数据主要包含用户的 4 种行为: 搜索,点击,下单,支付 。 数据规则如下:
➢ 数据文件中每行数据采用 下划线 分隔数据
➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
➢ 如果搜索关键字为 null, 表示数据不是搜索数据
➢ 如果点击的品类 ID 和产品 ID 为 -1 ,表示数据不是点击数据
➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个, id 之
间采用 逗号 分隔,如果本次不是下单行为,则数据采用 null 表示
➢ 支付行为和下单行为类似
需求 2 Top10 热门品类
1) 需求优化::先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
方案1:分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
需求分析如下:
1.读取原始日志数据
2.统计品类的点击数量: (品 类ID, 点击数量) 【先过滤出-1不是点击行为再聚合】
3. 统计品类的下单数量: (品类ID, 下单数量)
【先过滤出null不是下单行为且扁平化order_category_ids(品类id用逗号隔开)再聚合】
4. 统计品类的支付数量: (品类ID, 支付数量)
【先过滤出null不是支付行为且扁平化order_category_ids再聚合】
5. 将品类进行排序,并且取前10名
点击数量排序, 下单数量排序, 支付数量排序
元组排序:先比较第一个, 再比较第二个,再比较第三个,依此类推
( 品类ID, (点击数量,下单数量, 支付数量) )
cogroup = connect + group
将两个不同数据源连接在一起, join, 拉链zip, leftOuterJoin, cogroup
① join × 数据源需要有相同的key才能连一起, 即有点击不一定有下单
② zip × 分区的数量和分区元素的数量有要求, 但是题目没说要把相同品类放在一起, 所以zip的连接是和数量以及位置有关
③ leftOuterJoin不确定哪个是主表 ×
④ cogroup √ 会在自己的数据源中建立分组, 和另外一个数据源做connect连接, connect + group 即使不存在也会有组的概念的存在
6.将结果采集到控制台打印出来
功能实现:注意迭代器的使用
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("data/user_visit_action.txt")
// 2. 统计品类的点击数量:(品类ID,点击数量)
// 点击的品类ID为-1, 表示数据不是点击数据, click_category_id是7,索引是6
val clickActionRDD = actionRDD.filter(
action => {
val data = action.split("_")
data(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val data = action.split("_")
(data(6), 1) // (点击品类ID,1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
// 如果本次不是下单行为,则数据采用null表示
val orderActionRDD = actionRDD.filter(
action => {
val data = action.split("_")
data(8) != "null"
}
)
// 针对下单行为, 一次可以下单多个商品, 所以品类ID和产品ID可以是多个
// 所以需要把多个品类ID, 拆成一个一个, 比如:orderID => 1,2,3
// 【(1,1),(2,1),(3,1)】, 所以是扁平化操作
val orderCountRDD = orderActionRDD.flatMap(
action => {
val data = action.split("_")
val cid = data(8)
val cids = cid.split(",") //多个品类ID是用逗号隔开
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val data = action.split("_")
data(10) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val payCountRDD = payActionRDD.flatMap(
action => {
val data = action.split("_")
val cid = data(10)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
// 5. 将品类进行排序,并且取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
clickCountRDD.cogroup(orderCountRDD, payCountRDD)
val analysisRDD = cogroupRDD.mapValues{
case ( clickIter, orderIter, payIter ) => {
var clickCnt = 0
val iter1 = clickIter.iterator
if ( iter1.hasNext ) {
clickCnt = iter1.next()
}
var orderCnt = 0
val iter2 = orderIter.iterator
if ( iter2.hasNext ) {
orderCnt = iter2.next()
}
var payCnt = 0
val iter3 = payIter.iterator
if ( iter3.hasNext ) {
payCnt = iter3.next()
}
( clickCnt, orderCnt, payCnt )
}
}
// 品类是._1, 所以是._2
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
方案2:一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
方案1优化:
actionRDD重复使用:actionRDD.cache()
cogroup性能较低 (分区规则不同就会有可能启动shuffle),第5步重写成下面的逻辑
(品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))
(品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))
两两聚合在一起 => (品类ID, (点击数量, 下单数量, 0))
(品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))
两两聚合在一起 => (品类ID, (点击数量, 下单数量, 支付数量))
最终和方案1结果一样的( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
在方案1基础上优化的功能实现:
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Req1_HotCategoryTop10Analysis1 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// Q : actionRDD重复使用
// Q : cogroup性能可能较低 (如果分区不相同就会有shuffle)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("data/user_visit_action.txt")
actionRDD.cache() // actionRDD重复使用解决方案
// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRDD.filter(
action => {
val data = action.split("_")
data(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val data = action.split("_")
(data(6), 1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val payCountRDD = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
// (品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))
// (品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))
// 两两聚合在一起 => (品类ID, (点击数量, 下单数量, 0))
// (品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))
// 两两聚合在一起 => (品类ID, (点击数量, 下单数量, 支付数量))
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
// 5. 将品类进行排序,并且取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
val rdd1 = clickCountRDD.map{
case ( cid, cnt ) => {
(cid, (cnt, 0, 0))
}
}
val rdd2 = orderCountRDD.map{
case ( cid, cnt ) => {
(cid, (0, cnt, 0))
}
}
val rdd3 = payCountRDD.map{
case ( cid, cnt ) => {
(cid, (0, 0, cnt))
}
}
// 将三个数据源合并在一起,统一进行聚合计算
val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
// 相同的key两两聚合 连三次
val analysisRDD = sourceRDD.reduceByKey(
( t1, t2 ) => {
( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
方案3:一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
方案2优化:
存在大量的shuffle操作(reduceByKey - 不同的数据源)
reduceByKey 聚合算子,spark会提供优化(预聚合),缓存(不需要重复读取),但是这个案例是来自不同数据源的reduceByKey,会有shuffle操作。实际上,这里不用在reduceByKey之后再分别统计数量。在一开始就按照下面的数据结构统计,这样只会有一次reduceByKey:
点击的场合 : ( 品类ID,( 1, 0, 0 ) )
下单的场合 : ( 品类ID,( 0, 1, 0 ) )
支付的场合 : ( 品类ID,( 0, 0, 1 ) )
功能实现:
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Req1_HotCategoryTop10Analysis2 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// Q : 存在大量的shuffle操作(reduceByKey - 不同的数据源)
// reduceByKey 聚合算子,spark会提供优化,缓存
// 1. 读取原始日志数据
val actionRDD = sc.textFile("data/user_visit_action.txt")
// 2. 将数据转换结构
// 点击的场合 : ( 品类ID,( 1, 0, 0 ) )
// 下单的场合 : ( 品类ID,( 0, 1, 0 ) )
// 支付的场合 : ( 品类ID,( 0, 0, 1 ) )
// word没变, count微变 之前讲的方法是word变了, count没变
// 一次下单可能会有多个品类ID, 所以用flatMap,返回结果应该是List
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val data = action.split("_")
if (data(6) != "-1") {
// 点击的场合
List((data(6), (1, 0, 0)))
} else if (data(8) != "null") {
// 下单的场合
val ids = data(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (data(10) != "null") {
// 支付的场合
val ids = data(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 3. 将相同的品类ID的数据进行分组聚合
// ( 品类ID,( 点击数量, 下单数量, 支付数量 ) )
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
// 4. 将统计结果根据数量进行降序处理,取前10名
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 5. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
方案4:使用累加器的方式聚合数据 - 不使用shuffle操作
功能实现:
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark04_Req1_HotCategoryTop10Analysis3 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("data/user_visit_action.txt")
val acc = new HotCategoryAccumulator
sc.register(acc, "hotCategory")
// 2. 将数据转换结构
actionRDD.foreach(
action => {
val data = action.split("_")
if (data(6) != "-1") {
// 点击的场合
acc.add((data(6), "click"))
} else if (data(8) != "null") {
// 下单的场合
val ids = data(8).split(",")
ids.foreach(
id => {
acc.add( (id, "order") )
}
)
} else if (data(10) != "null") {
// 支付的场合
val ids = data(10).split(",")
ids.foreach(
id => {
acc.add( (id, "pay") )
}
)
}
}
)
val accVal: mutable.Map[String, HotCategory] = acc.value
val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)
// 可迭代的集合不能排序, 需要转为toList; 自定义降序排列
val sort = categories.toList.sortWith(
(left, right) => {
if ( left.clickCnt > right.clickCnt ) {
true
} else if (left.clickCnt == right.clickCnt) {
if ( left.orderCnt > right.orderCnt ) {
true
} else if (left.orderCnt == right.orderCnt) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}
)
// 5. 将结果采集到控制台打印出来
sort.take(10).foreach(println)
sc.stop()
}
// case class 参数默认是val,下面的clickCnt、orderCnt和payCnt可以改,需要加var
case class HotCategory( cid:String, var clickCnt : Int, var orderCnt : Int, var payCnt : Int )
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]{
private val hcMap = mutable.Map[String, HotCategory]()
override def isZero: Boolean = {
hcMap.isEmpty
}
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
new HotCategoryAccumulator()
}
override def reset(): Unit = {
hcMap.clear()
}
override def add(v: (String, String)): Unit = {
val cid = v._1
val actionType = v._2
val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0,0,0))
if ( actionType == "click" ) {
category.clickCnt += 1
} else if (actionType == "order") {
category.orderCnt += 1
} else if (actionType == "pay") {
category.payCnt += 1
}
hcMap.update(cid, category)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1 = this.hcMap
val map2 = other.value
map2.foreach{
case ( cid, hc ) => {
val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0,0,0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
}
override def value: mutable.Map[String, HotCategory] = hcMap
}
}
需求 3 Top10 热门品类中每个品类的Top10活跃Session统计
1) 需求说明:在需求一的基础上,增加每个品类用户session的点击统计
2) 功能实现:在需求2得到Top10Ids的基础上做需求3
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_Req2_HotCategoryTop10SessionAnalysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
val actionRDD = sc.textFile("data/user_visit_action.txt")
actionRDD.cache()
val top10Ids: Array[String] = top10Category(actionRDD)
// 1. 过滤原始数据,保留点击和前10品类ID
val filterActionRDD = actionRDD.filter(
action => {
val data = action.split("_")
if ( data(6) != "-1" ) {
top10Ids.contains(data(6))
} else {
false
}
}
)
// 2. 根据品类ID和sessionid进行点击量的统计
val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
action => {
val data = action.split("_")
((data(6), data(2)), 1)
}
).reduceByKey(_ + _)
// 3. 将统计的结果进行结构的转换
// (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )
val mapRDD = reduceRDD.map{
case ( (cid, sid), sum ) => {
( cid, (sid, sum) )
}
}
// 4. 相同的品类进行分组
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
// 5. 将分组后的数据进行点击量的排序,取前10名
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)
resultRDD.collect().foreach(println)
sc.stop()
}
def top10Category(actionRDD:RDD[String]) = {
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val data = action.split("_")
if (data(6) != "-1") {
// 点击的场合
List((data(6), (1, 0, 0)))
} else if (data(8) != "null") {
// 下单的场合
val ids = data(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (data(10) != "null") {
// 支付的场合
val ids = data(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
}
)
analysisRDD.sortBy(_._2, false).take(10).map(_._1)
}
}
需求 4 页面单跳转换率统计
1) 页面跳转率定义:计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
2) 功能实现:
获取源数据=>根据用户的回话进行分组=>分组后组内根据时间进行排序保证页面跳转顺序是对的=>只保留页面后分别计算分母和分子
分母比较简单,即wordCount
分子需要保证连续的页面访问,相邻的元素形成整体,在统计出现的次数
页面单跳转率=分子/分母
package com.meiyuan.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_Req3_PageflowAnalysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
val actionRDD = sc.textFile("data/user_visit_action.txt")
val actionDataRDD = actionRDD.map(
action => {
val data = action.split("_")
UserVisitAction(
data(0),
data(1).toLong,
data(2),
data(3).toLong,
data(4),
data(5),
data(6).toLong,
data(7).toLong,
data(8),
data(9),
data(10),
data(11),
data(12).toLong
)
}
)
actionDataRDD.cache()
// TODO 对指定的页面连续跳转进行统计
// 1-2,2-3,3-4,4-5,5-6,6-7
val ids = List[Long](1,2,3,4,5,6,7)
val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)
// TODO 计算分母
val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
action => {
ids.init.contains(action.page_id) // 除了最后一个7
}
).map(
action => {
(action.page_id, 1L)
}
).reduceByKey(_ + _).collect().toMap
// TODO 计算分子
// 根据session进行分组
val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)
// 分组后,根据访问时间进行排序(升序)
val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
iter => {
val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
// 【1,2,3,4】
// 【1,2】,【2,3】,【3,4】
// 【1-2,2-3,3-4】
// Sliding : 滑窗
// 【1,2,3,4】
// 【2,3,4】
// zip : 拉链
val flowIds: List[Long] = sortList.map(_.page_id)
val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
// 将不合法的页面跳转进行过滤
pageflowIds.filter(
t => {
okflowIds.contains(t)
}
).map(
t => {
(t, 1)
}
)
}
)
// ((1,2),1)
val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list=>list)
// ((1,2),1) => ((1,2),sum)
val dataRDD = flatRDD.reduceByKey(_+_)
// TODO 计算单跳转换率
// 分子除以分母
dataRDD.foreach{
case ( (pageid1, pageid2), sum ) => {
val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)
println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + ( sum.toDouble/lon ))
}
}
sc.stop()
}
//用户访问动作表
case class UserVisitAction(
date: String,//用户点击行为的日期
user_id: Long,//用户的ID
session_id: String,//Session的ID
page_id: Long,//某个页面的ID
action_time: String,//动作的时间点
search_keyword: String,//用户搜索的关键词
click_category_id: Long,//某一个商品品类的ID
click_product_id: Long,//某一个商品的ID
order_category_ids: String,//一次订单中所有品类的ID集合
order_product_ids: String,//一次订单中所有商品的ID集合
pay_category_ids: String,//一次支付中所有品类的ID集合
pay_product_ids: String,//一次支付中所有商品的ID集合
city_id: Long
)//城市 id
}
在实际工作中,这种写法不好扩展功能和维护。而是按照下面的三重架构模式编写代码:
7. 其他相关面试题
1. 如何使用Spark实现TopN的获取(描述思路或使用伪代码)?
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)
注意:当数据量太大时,会导致OOM
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
2. 现场写一个笔试题
hdfs文件,文件每行的格式为作品ID,用户id,用户性别。请用一个spark任务实现以下功能:统计每个作品对应的用户(去重后)的性别分布。输出格式如下:作品ID,男性用户数量,女性用户数量
答案:
sc.textfile().flatmap(.split(",")) //分割成作品ID,用户id,用户性别
.map(((_.1,_._2),1)) //((作品id,用户性别),1)
.reduceByKey(_+_) //((作品id,用户性别),n)
.map(_._1._1,_._1._2,_._2) //(作品id,用户性别,n)
参考资料:
https://blog.csdn.net/yangshengwei230612/article/details/115383518
spark常用算子_chbxw-CSDN博客
spark系列6:常用RDD介绍与演示_涤生手记-CSDN博客



