Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
-
RDD : 弹性分布式数据集
-
累加器:分布式共享只写变量
-
广播变量:分布式共享只读变量
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
# 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
# 分布式:数据存储在大数据集群不同节点上
# 数据集:RDD封装了计算逻辑,并不保存数据
# 数据抽象:RDD是一个抽象类,需要子类具体实现
# 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新 的RDD里面封装计算逻辑
# 可分区、并行计算
核心属性
执行原理
1.启动Yarn集群环境
2.Spark通过申请资源创建调度节点和计算节点
3.Spark框架根据需求将计算逻辑根据分区划分成不同的任务
4.调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
RDD的创建主要的创建方式有两种:从集合(内存中创建),从外部存储(文件)中创建
从内存中创建
从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1,2,3,4)
)
//这种方法用的多,其实底层就是parallelize()
val rdd2 = sparkContext.makeRDD(
List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
从文件中创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、Hbase等。
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input") //主持目录或者通配符pihao*.txt
fileRDD.collect().foreach(println)
sparkContext.stop()
RDD并行度与分区
默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
内存创建RDD分区策略
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
// conf.set("spark.default.parallelism", "4")
val sc = new SparkContext(conf)
// TODO 从内存中创建RDD - 分区
// 1. 如果构建RDD时,没有指定数据处理分区的数量,那么会使用默认分区数量
// makeRDD方法存在第二个参数,这个参数表示分区数量numSlices(存在默认值)
// scheduler.conf.getInt("spark.default.parallelism", totalCores)
// totalCores : 当前Master环境的总(虚拟)核数
// 分区设置的优先级 : 方法参数 > 配置参数 > 环境配置
// kafka生产者分区策略
// 【1,3,5】【2,4】 : 轮询(2个分区)
// 【1,2, 3】【5,4】 :范围(2个分区)
// 【1,2】【3,4】【5】 :范围(3个分区)
// Spark分区策略
// 【1,2】【3,4,5】 :范围(2个分区)
// 【1】【2,3】【4,5】:范围(3个分区)
val rdd1 : RDD[Int] = sc.makeRDD(
Seq(1,2,3,4,5), 3
)
// saveAsTextFile方法可以生成分区文件
rdd1.saveAsTextFile("output2")
sc.stop()
}
文件创建RDD分区策略
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 从文件中创建RDD
// textFile方法可以在读取文件时,设定分区
// 设定分区时,应该传递第二个参数,如果不设定,存在默认值
// math.min(defaultParallelism, 2)
// 第二个参数表示最小分区数,所以最终的分区数量可以大于这个值的。
// TODO 1. spark读取文件其实底层就是hadoop读取文件
// TODO 2. spark的分区数量其实就来自于hadoop读取文件的切片
// (想要切片数量)numSplits = 2
// totalSize = 7
// (预计每个分区的字节大小)goalSize = totalSize / numSplits = 7 / 2 = 3
// splitSize = Math.max(minSize(1), Math.min(goalSize(5G), blockSize(128M))) = 128;
// 7 / 3 = 2...1 = 2 + 1 = 3
val rdd = sc.textFile("data/word.txt", 2)
rdd.saveAsTextFile("output")
sc.stop()
}
TODO 1. 分区数据的处理也是由Hadoop决定的。
TODO 2. hadoop在计算分区时会处理数据时的逻辑不一样。
TODO 3. Spark读取文件数据底层使用的就是hadoop读取的,所以读取规则用的是hadoop
3.1 hadoop读取数据是按行读取的,不是按字节读取
3.2 hadoop读取数据是偏移量读取的
3.3 hadoop读取数据时,不会重复读取相同的偏移量
原始数据(13个字节):
123
456
789
1. 计算有多少个分区?
13 / 3 = 4 (每个分区放4个字节)
13 / 4 = 3...1 = 3 + 1 = 4 (最终确定4个分区)
2. 计算每个分区放什么数据?
123@@ => 01234
456@@ => 56789
789 => 101112
************************************
计算读取偏移量(每个分区4个字节),从0开始,总共13个字节
[0, 4] => [123] (0~4,第一行读完)
[4, 8] => [456] (4已经被读了,从5开始,5-8,但是读就会读一行,9也被读了)
[8, 12] => [789](9被读了,从10开始)
[12, 13] => [] (没数据了)
RDD算子
RDD转换算子将RDD的方法称为算子的原因是为了和Scala集合的方法进行区分,RDD的方法很多,但是一般分为两大类:
逻辑封装,将旧的逻辑转换为新的逻辑,称之为转换算子
执行逻辑,将封装好的逻辑开始执行,让整个作业运行起来,称之为行动算子
Value类型 mapRDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型
//使用案例
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - map
val rdd = sc.makeRDD(List(1,2,3,4))
// map算子表示将数据源中的每一条数据进行处理
// map算子的参数是函数类型: Int => U(不确定)
def mapFunction( num : Int ): Int = {
num * 2
}
// A => B 把旧的RDD转换成新的RDD,这个代码不会执行
//val rdd1: RDD[Int] = rdd.map(mapFunction)
val rdd1: RDD[Int] = rdd.map(_ * 2)
rdd1.collect().foreach(println) //collect()是行动算子,会执行
sc.stop()
}
//存在分区的情况:在RDD进行转换时,新的RDD分区数量与旧的RDD分区数量保持一致
//数据在处理过程中,默认的分区不变,原来在哪个分区,现在也在哪个分区
//数据在处理过程中,遵循执行的顺序:分区内有序,分区间无序,前提是多线程
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// 设置两个分区: 【1,2】,【3,4】
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// TODO 分区 - 2
// 数据执行顺序
//rdd就是逻辑的封装,如果有多个Rdd的话,那么第一条数据应该是所有的逻辑执行完毕后,才执行下一条数据
val rdd1: RDD[Int] = rdd.map(
num => {
println("********* num = " + num)
num
}
)
val rdd2: RDD[Int] = rdd1.map(
num => {
println("######## num = " + num)
num
}
)
rdd2.saveAsTextFile("output")
sc.stop()
}
小练习
从服务器日志数据apache.log中获取用户请求URL资源路径
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
val lineRDD = sc.textFile("data/apache.log")
val url = lineRDD.map(
line => {
val data = line.split(" ")
data(6)
}
)
url.collect().foreach(println)
}
mapPartitions
以一个分区的数据进行转换,有点类似批处理的感觉,将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
mapPartitions的入参函数的入参和返回值类型都要求是可迭代的对象
//可以过滤数据
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_==2)
}
)
//以分区的数据做转换
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 -
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val rdd1 = rdd.mapPartitions(
list => {
println("*********************") //走两遍,如果是map的话要走4遍
list.map(_*2)
}
)
rdd1.collect().foreach(println)
sc.stop()
}
小练习
获取每个分区的最大值
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 -
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 获取每个数据分区的最大值
// 【1,2】【3,4】
val rdd1 = rdd.mapPartitions(
list => {
val max = list.max
List(max).iterator //包装转换成iterator类型
}
)
rdd1.collect.foreach(println)
sc.stop()
}
map和mapPartitions的区别
#数据处理角度 Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。 #功能的角度 Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据 #性能的角度 Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。 #完成比完美更重要mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
假如有三个分区,我只要想第二个分区的数量,要怎么做呢
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)
// 0 1 2 分区索引从0开始
// 【1,2】,【3,4】,【5,6】
val rdd1 = rdd.mapPartitionsWithIndex(
(ind, list) => {
if ( ind == 1 ) {
list
} else {
Nil.iterator
}
}
)
rdd1.collect().foreach(println)
sc.stop()
}
flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 扁平化
val rdd = sc.makeRDD(
List(
"Hello Scala", "Hello Spark"
)
)
val rdd1 = sc.makeRDD(
List(
List(1,2), List(3,4)
)
)
// 整体 => 个体
//val rdd2 = rdd.flatMap(_.split(" "))
val rdd2 = rdd.flatMap(
str => { // 整体(1)
// 容器 接受-->(个体(N))
str.split(" ")
}
)
val rdd3 = rdd1.flatMap(
list => {
list //这里的两个list的含义不同
}
)
rdd3.collect().foreach(println)
sc.stop()
}
小案例
将List(List(1,2),3,List(4,5))进行扁平化操作
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(List(1,2),3,List(4,5))
)
val rdd1 = rdd.flatMap { //数据类型不一致,使用模式匹配来做
case list : List[_] => list
case other => List(other)
}
rdd1.collect.foreach(println)
sc.stop()
}
glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
//两分区 123,456
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)
val rdd1: RDD[Array[Int]] = rdd.glom() //类型Array[Int]
rdd1.collect().foreach(a => println(a.mkString(",")))// 123 456
sc.stop()
}
小案例
计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 扁平化
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)
val rdd1: RDD[Array[Int]] = rdd.glom()
val rdd2: RDD[Int] = rdd1.map(_.max)
println(rdd2.collect().sum) // 9
sc.stop()
}
groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。
一个组的数据必须在一个分区中,但是并不是说一个分区中只有一个组(假如只有一个分区,多个组都放)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test") //落盘会一闪而过
val sc = new SparkContext(conf)
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
// groupBy算子根据函数计算结果进行分组。
// groupBy算子执行结果为KV数据类型
// k是为分组的标识, v就是同一个组的数据集合
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
// 1 => 1
// 2 => 0
// 3 => 1
// 4 => 0
// 5 => 1
// 6 => 0
sc.stop()
}
//分组后,一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称为Shuffle
//shuffle操作不允许在内存中等待,必须落盘
shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据,写数据的过程没有完成不允许读数据
shuffle的操作是可以更改分区的。会浪费资源你懂的
小练习1
将List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组
从服务器日志数据apache.log中获取每个时间段访问量
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// 将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组
// val rdd = sc.makeRDD(
// List("Hello", "hive", "hbase", "Hadoop")
// )
//
// val rdd1 = rdd.groupBy(_.substring(0, 1).toUpperCase())
// rdd1.collect.foreach(println)
//178.152.18.59 - - 19/05/2015:04:05:08 +0000 GET /reset.css
// 从服务器日志数据apache.log中获取每个时间段访问量
// (10, 100), (11, 101)
val lines = sc.textFile("data/apache.log")
// (time, List((time,1),(time, 1)))
// TODO groupBy算子可以实现 WordCount ( 1 / 10 )
val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map(
lines => {
val datas = lines.split(" ")
val time = datas(3)
val times = time.split(":")
(times(1), 1)
}
).groupBy(_._1)
val timeCnt: RDD[(String, Int)] = groupRDD.mapValues(_.size)
timeCnt.collect.foreach(println)
sc.stop()
}
filter
按照指定的规则对每一条数据筛选过滤
true为保留,false丢弃
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
// filter算子可以按照指定的规则对每一条数据进行筛选过滤
// 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
val rdd1 = rdd.filter(
num => num % 2 == 1
)
rdd1.collect.foreach(println)
sc.stop()
}
小案例
从服务器日志数据apache.log中获取2015年5月17日的请求路径
178.152.18.59 - - 19/05/2015:04:05:08 +0000 GET /reset.css
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换
val lines = sc.textFile("data/apache.log")
val filterLines = lines.filter(
line => {
//line.contains("17/05/2015")
val datas = line.split(" ")
val time = datas(3)
time.startsWith("17/05/2015")
}
)
val r = filterLines.map(
line => {
val datas = line.split(" ")
datas(6)
}
)
r.collect().foreach(println)
sc.stop()
}
sample
根据指定的规则从数据集中抽取数据
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换
val rdd : RDD[Int] = sc.makeRDD(1 to 10)
// 抽取数据,采样数据
// 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
// 第二个参数和第一个参数有关系
// 如果抽取不放回的场合:参数表示每条数据被抽取的几率
// 如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
// 第三个参数是【随机数】种子
// 随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
// 3 = xxxxx(10)
// 7 = xxxxx(3)
//val rdd1: RDD[Int] = rdd.sample(false, 0.5)
//val rdd1: RDD[Int] = rdd.sample(true, 2)
val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
rdd1.collect.foreach(println)
sc.stop()
}
coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 缩减分区
val rdd : RDD[Int] = sc.makeRDD(
List(1,2,3,4,5,6), 3
)
// 缩减 (合并), 默认情况下,缩减分区不会shuffle,还可能把两个数据多的分区合并了
//val rdd1: RDD[Int] = rdd.coalesce(2)
// 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
val rdd2: RDD[Int] = rdd.coalesce(2, true)
rdd.saveAsTextFile("output")
rdd2.saveAsTextFile("output1")
sc.stop()
}
distinct
去重
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
val rdd : RDD[Int] = sc.makeRDD(
List(1,1,1)
)
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// 【1,1,1】
// 【(1, null),(1, null),(1, null)】
// 【null, null, null】
// 【null, null】
// 【(1, null)】
// 【1】
val rdd1: RDD[Int] = rdd.distinct() // 存在shuffle,因此distinct()可以传入分区数
rdd1.collect.foreach(println)
//List(1,1,1,1,1).distinct 与单点集合的去重方法区分开
sc.stop()
}
repartition
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 缩减分区
val rdd : RDD[Int] = sc.makeRDD(
List(1,2,3,4,5,6), 2
)
// 扩大分区 - repartition
// 在不shuffle的情况下,coalesce算子扩大分区是没有意义的。
//val rdd1: RDD[Int] = rdd.coalesce(3, true)
val rdd1: RDD[Int] = rdd.repartition(3) // shuffle默认为true
rdd.saveAsTextFile("output")
rdd1.saveAsTextFile("output1")
sc.stop()
}
sortBy
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 排序
val rdd : RDD[Int] = sc.makeRDD(
List(1,4,3,2,6,5),2
)
//肯定存在shuffle,刚开始是143,265,现在排序后变成了123,456
val rdd1: RDD[Int] = rdd.sortBy(num => num, false) //false表示降序
println(rdd1.collect.mkString(",")) // 1,2,3,4,5,6
sc.stop()
}
双Value类型
intersection,union,intersection,zip
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - 排序
val rdd : RDD[Int] = sc.makeRDD(
List(1,2,3,4),2
)
val rdd1 : RDD[Int] = sc.makeRDD(
List(3,4,5,6),2
)
val rdd2 : RDD[String] = sc.makeRDD(
List("3","4","5", "6"),2
)
// 交集
//println(rdd.intersection(rdd1).collect().mkString(","))
// 并集
//println(rdd.union(rdd1).collect().mkString(",")) //1234445
// 差集
//println(rdd.subtract(rdd1).collect().mkString(","))
// 交集并集差集的类型必须一样
// 拉链:前提条件分区数量相等,每个分区的中的元素个数相同
// 英文翻译:
// Can only zip RDDs with same number of elements in each partition
// Can't zip RDDs with unequal numbers of partitions: List(2, 3)
//println(rdd.zip(rdd1).collect().mkString(","))
println(rdd.zip(rdd2).collect().mkString(",")) //类型可以不一样
sc.stop()
}
Key-Value类型
partitionBy
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - K-V
val rdd : RDD[Int] = sc.makeRDD(
List(1,2,3,4),2
)
rdd.sortBy(num=>num)
//rdd.partitionBy(null);
// partitionBy: 算子是根据指定的规则对每一条数据进行重分区
// repartition : 强调分区数量的变化,数据怎么变不关心
// partitionBy : 关心数据的分区规则
val rdd1: RDD[(Int, Int)] = rdd.map((_, 1))
// 下面调用RDD对象的partitionBy方法一定会报错。
// 二次编译(隐式转换)
// RDD => PairRDDFunctions
// HashPartitioner是Spark中默认shuffle分区器,也就是hashcode多数量取余
rdd1.partitionBy(new HashPartitioner(2)).saveAsTextFile("output");
sc.stop()
}
reduceByKey
可以将数据按照相同的Key对Value进行聚合
reduceByKey可以在shuffle之前,对分区内的数据进行预聚合,称之为combine,稍后会讲
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - reduceByKey
val rdd : RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1),
("a", 1),
("a", 1)
)
)
// reduceByKey算子的作用,是将相同的key的value分在一个组中,然后进行reduce操作
// TODO reduceByKey可以实现WordCount ( 2 / 10 )
val wordCount: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
wordCount.collect.foreach(println)
sc.stop()
}
groupByKey
将数据源的数据根据key对value进行分组
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 算子 - 转换 - groupByKey
val rdd : RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1),
("a", 1),
("a", 1)
)
)
val value: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
// TODO groupByKey & groupBy
// 1. groupBy不需要考虑数据类型,groupByKey必须保证数据kv类型
// 2. groupBy按照指定的规则进行分组,groupByKey必须根据key对value分组
// 3. 返回结果类型
// groupByKey => (String, Iterable[Int]) 、key作为键
// groupBy => (String, Iterable[(String, Int)]) 、 那个指定规则的结果为键
// groupByKey算子将相同key数据的value分在一个组中
// TODO groupByKey也可以实现 WordCount ( 3 / 10 )
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val rdd2: RDD[(String, Int)] = rdd1.mapValues(_.size)
rdd2.collect.foreach(println)
sc.stop()
}
思考一个问题:reduceByKey和groupByKey的区别?
# 从shuffle的角度: reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。 # 从功能的角度: reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKeyaggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 取出每个分区内相同key的最大值然后分区间相加
// 【(a,1),(a,2),(b,3)】
// => 【(a, 2), (b, 3)】
// => 【 (a, 8), (b, 8) 】
// => 【(b, 5), (a, 6)】
// 【(b,4),(b,5),(a,6)】
val rdd = sc.makeRDD(
List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),
2
)
// aggregateByKey算子存在函数柯里化
// 第一个参数列表中有一个参数
// 参数为零值,表示计算初始值 zero, z, 用于数据进行分区内计算
// 第二个参数列表中有两个参数
// 第一个参数表示 分区内计算规则
// 第二个参数表示 分区间计算规则
val rdd1 = rdd.aggregateByKey(5)(
(x, y) => {
math.max(x, y)
},
(x, y) => {
x + y
}
)
rdd1.collect.foreach(println)
sc.stop()
}
foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
val rdd = sc.makeRDD(
List(
("a",1),("a",2),("b",3),
("b",4),("b",5),("a",6)
),
2
)
// TODO aggregateByKey也可以实现WordCount ( 4 / 10 )
val rdd2 = rdd.aggregateByKey(0)(_+_, _+_)
// TODO foldByKey也可以实现WordCount ( 5 / 10 )
// TODO 如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化
val rdd3 = rdd.foldByKey(0)(_+_)
rdd3.collect.foreach(println)
combineByKey
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
// TODO 求每个key的平均值
val rdd = sc.makeRDD(
List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),
2
)
val rdd1 = sc.makeRDD(
List(
("a", (1,1)), ("a", 2), ("b", (3,1)),
("b", (4,1)), ("b", 5), ("a", (6,1))
),
2
)
// 这几种算子都做不到
// groupByKey() => total / cnt = avg
// reduceByKey() => total / cnt = avg
// aggregateByKey(z)(f1, f2) => total / cnt = avg
// foldByKey(z)(f1) => total / cnt = avg
// (a, 3)(b, 4)
// combineByKey算子有三个参数
// 第一个参数表示: 当第一个数据不符合我们的规则时,用于进行转换的操作
// 第二个参数表示: 分区内计算规则
// 第三个参数表示: 分区间计算规则
val rdd2 = rdd.combineByKey(
num => (num, 1),
(x : (Int, Int), y) => {
(x._1 + y, x._2 + 1)
},
( x : (Int, Int), y:(Int, Int) ) => {
(x._1 + y._1, x._2 + y._2)
}
)
rdd2.collect.foreach(println)
sc.stop()
}
sortByKey
在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(
("a", 2), ("a", 1), ("c", 3), ("b", 4)
)
)
// ("a", 2) ("a", 1)("b", 4) ("c", 3)
// sortByKey算子就是按照key排序,不管value的值
val rdd1: RDD[(String, Int)] = rdd.sortByKey()
rdd1.collect.foreach(println)
sc.stop()
}
设置key为自定义类User
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(
(new User(), 2), (new User(), 1), (new User(), 3), (new User(), 4)
)
)
// sortByKey算子就是按照key排序,不管value的值
val rdd1: RDD[(User, Int)] = rdd.sortByKey(false)
rdd1.collect.foreach(println)
sc.stop()
}
//排序的话必须实现一个Ordered特质
class User extends Ordered[User]{
override def compare(that: User): Int = {
1
}
}
join
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.local.dir", "e:/test")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(
List(
("a", 1), ("b", 2), ("c", 3)
)
)
val rdd2 = sc.makeRDD(
List(
("a", 4), ("d", 5), ("c", 6)
)
)
// spark中join操作主要针对于两个数据集中相同的key的数据连接
// join操作可能会产生笛卡尔乘积,可能会出现shuffle,性能比较差
// 所以如果能使用其他方式实现同样的功能,不推荐使用join
//val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2) |(a,(1,4))(c,(3,6))
// 主,从表
//val rdd3 = rdd1.leftOuterJoin(rdd2)
//val rdd4 = rdd1.rightOuterJoin(rdd2)
//val rdd5 = rdd1.fullOuterJoin(rdd2)
// connect + group 比较重要
val rdd6 = rdd1.cogroup(rdd2)
//rdd3.collect.foreach(println)
println("**********************")
//rdd4.collect.foreach(println)
println("**********************")
//rdd5.collect.foreach(println)
println("**********************")
rdd6.collect.foreach(println)
//(a,(CompactBuffer(1),CompactBuffer(4, 5, 6)))
//(b,(CompactBuffer(2),CompactBuffer()))
//(c,(CompactBuffer(3),CompactBuffer()))
sc.stop()
}
实操案例
-
数据准备
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
-
需求描述
统计出每一个省份每个广告被点击数量排行的Top3
-
需求分析
-
功能实现
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
val sc = new SparkContext(conf)
// TODO 统计出每一个省份每个广告被点击数量排行的Top3
// TODO 1. 读取数据文件,获取原始数据
// 1516609143867 6 7 64 16
val lines = sc.textFile("data/agent.log")
// TODO 2. 将原始数据进行结构的转换
// line => ((省份,广告),1)
val wordToOne = lines.map(
line => {
val datas = line.split(" ")
( (datas(1), datas(4)), 1 )
}
)
// TODO 3. 将转换结构后的进行统计
// ((省份,广告),1) => ((省份,广告),sum)
val wordToSum = wordToOne.reduceByKey(_+_)
// TODO 4. 将统计结果进行结构的转换,将省份独立出来
// ((省份,广告),sum) => (省份,(广告,sum))
val wordToTuple = wordToSum.map {
case ( (prv, adv), sum ) => {
( prv, (adv, sum) )
}
}
// TODO 5. 将数据按照省份进行分组
// (省份,List[(广告,sum), (广告1,sum1), (广告2,sum2)])
val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToTuple.groupByKey()
// TODO 6. 将分组后的数据,根据点击数量进行排行(降序)
// TODO 7. 将排序后的数据取前3
val top3 = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
// TODO 8. 将结果采集后打印在控制台上
top3.collect.foreach(println)
sc.stop()
}
另一种实现方式,先省份分组,再做广告的统计,排序
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
val sc = new SparkContext(conf)
// TODO 统计出每一个省份每个广告被点击数量排行的Top3
// TODO 1. 读取数据文件,获取原始数据
// 1516609143867 6 7 64 16
val lines = sc.textFile("data/agent.log")
// TODO 2. 将原始数据进行结构的转换
// line => (省份,(广告,1))
val wordToOne = lines.map(
line => {
val datas = line.split(" ")
(datas(1), (datas(4), 1 ))
}
)
val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupByKey()
val top3 = groupRDD.mapValues(
iter => {
val wordCountMap: Map[String, Int] = iter.groupBy(_._1).mapValues(_.size)
wordCountMap.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
top3.collect.foreach(println)
sc.stop()
}
以上两个例子,推荐使用第一种方式,先统计分析,然后再做分组,这样性能获得到极大的提升。
也就是先reduceByKey,而不是GroupByKey
局部排序与全局排序
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
val sc = new SparkContext(conf)
// TODO 统计出每一个省份每个广告被点击数量排行的Top3
// TODO 1. 读取数据文件,获取原始数据
// 1516609143867 6 7 64 16
val lines = sc.textFile("data/agent.log")
// TODO 2. 将原始数据进行结构的转换
// line => ((省份,广告),1)
val wordToOne = lines.map(
line => {
val datas = line.split(" ")
( (datas(1), datas(4)), 1 )
}
)
// TODO 3. 将转换结构后的进行统计
// ((省份,广告),1) => ((省份,广告),sum)
val wordToSum = wordToOne.reduceByKey(_+_)
// TODO 4. 将统计结果进行结构的转换,将省份独立出来
// ((省份,广告),sum) => (省份,(广告,sum))
val wordToTuple = wordToSum.map {
case ( (prv, adv), sum ) => {
( prv, (adv, sum) )
}
}
// TODO 5. 将数据按照省份进行分组
// (省份,List[(广告,sum), (广告1,sum1), (广告2,sum2)])
//val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToTuple.groupByKey()
// TODO 6. 将分组后的数据,根据点击数量进行排行(降序)
// TODO 7. 将排序后的数据取前3
// val top3 = groupRDD.mapValues(
// iter => {
// iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
// }
// )
// 【 (a, 1), (a, 2), (a, 3) 】 => 【(a, 2), (a, 3) 】
// 【 (a, 4), (a, 5), (a, 6) 】 => 【 (a, 5), (a, 6)】
// 【 (a, 7), (a, 8), (a, 9) 】 => 【(a, 8), (a, 9)】 => 【(a, 8), (a, 9)】
// 分区内排序取前3名
// 分区间排序取前3名
// 【(广告,sum), (广告,sum), (广告,sum), (广告,sum)】
//上面的toList很消耗内存,这种方式极大节省内存,分区内排序,然后再分区间排序
val top3 = wordToTuple.aggregateByKey(ArrayBuffer[(String, Int)]())(
(buff, t) => {
buff.append(t)
buff.sortBy(_._2)(Ordering.Int.reverse).take(3)
},
( buff1, buff2 ) => {
buff1.appendAll(buff2)
buff1.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
// TODO 8. 将结果采集后打印在控制台上
top3.collect.foreach(println)
sc.stop()
}
RDD行动算子
reduce,collect,count,first,take,tabkeOrderedSpark RDD方法分为2大类,其中一个是转换算子,一个为行动算子
行动算子在被调用时,会触发Spark作业的执行,collect算子就是行动算子,行动算子执行时,会构建新的作业
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(List(1,4,3,2),2)
// reduce算子
val i: Int = rdd.reduce(_ + _)
println(i) //10
// 将数据从Executor端采集回到Driver端
// collect会将数据全部拉取到Driver端的内存中,形成数据集合,可能会导致内存溢出
val ints: Array[Int] = rdd.collect()
println(ints.mkString(",")) //1,4,3,2
val l: Long = rdd.count()
println(l)//4
val i1: Int = rdd.first()
println(i1) //1
val ints1: Array[Int] = rdd.take(3)
println(ints1.mkString(","))//1,4,3
// 【1,2,3】
val ints2: Array[Int] = rdd.takeOrdered(3)
println(ints2.mkString(",")) //1,2,3 排完序后取3个
sc.stop()
}
aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(List(1,4,3,2),2)
val x: Int = rdd.aggregate(0)(_ + _, _ + _) // 10
// aggregate & aggregateByKey的区别?
// 1. 数据格式
// 2. aggregateByKey是一个转换算子,所以执行后会产生新的RDD
// aggregate是一个行动算子,所以执行后会得到结果
// 3. aggregateByKey执行计算时,初始值只会参与分区内计算
// aggregate执行计算时,初始值会参与分区内计算,也会参与分区间的计算
// 【1,4】,【3,2】
// 【5,1,4】,【5,3,2】
// 【10】【10】
// 【5, 10, 10】
val i: Int = rdd.aggregate(5)(_ + _, _ + _) // 25
val j: Int = rdd.fold(5)(_ + _)
val k: Int = rdd.reduce(_ + _)
println(i)
sc.stop()
}
countByKey
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(List(1,4,3,2),2)
// countByKey算子表示相同key出现的次数
val rdd1: RDD[(String, Int)] = rdd.map(("a", _))
// (a, 1), (a, 4), (a, 3), (a, 2)
// (a, 4) => (a, 1),(a, 1),(a, 1),(a, 1)
// (a, 10)
// TODO countByKey算子可以实现 WordCount (7 / 10)
val map: collection.Map[String, Long] = rdd1.countByKey()
println(map)
sc.stop()
}
countByValue
统计每个值出现的次数
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
//val rdd = sc.makeRDD(List(1,1,1,1,2,2,3),2) (1,4次),(2,两次)
val rdd = sc.makeRDD(List(
("a", 2), ("a", 3)
),2)
// countByKey算子表示相同key出现的次数
//val rdd1: RDD[(String, Int)] = rdd.map(("a", _))
// countByValue中Value不是KV键值对中的v的意思
// 单value,双value,K-V
// TODO countByValue可以实现 WordCount (8 / 10)
// ("a", 2) => "a", "a"
// ("a", 3) => "a", "a", "a"
// ( a, 5 )
val map = rdd.countByValue()
println(map)
sc.stop()
}
save
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(List(
("a", 2), ("a", 3)
),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
sc.stop()
}
foreach
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(
List(1,4,3,2),2
)
// collect是按照分区号码进行采集,collect会按照分区号采集,先采集0号分区
rdd.collect.foreach(println) //这个是数组的方法,单点的内存操作 1432
println("****************************")
rdd.foreach(println) //这个是rdd的,是算子(数据可能打乱,分布式的)
sc.stop()
}
RDD序列化
闭包检测从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(
List[Int](),2
)
val user = new User() // Driver
rdd.foreach(
num => {
println(user.age + num) // Executor
}
)
// Scala语法 : 闭包
// Spark在执行算子时,如果算子的内部使用了外部的变量(对象),那么意味着一定会出现闭包
// 在这种场景中,需要将Driver端的变量通过网络传递给Executor端执行,这个操作不用执行也能判断出来
// 可以在真正执行之前,对数据进行序列化校验,
// Spark在执行作业前,需要先进行闭包检测功能。
sc.stop()
}
class User {
val age = 30
}
def main(args: Array[String]): Unit = {
// 一个应用程序, Driver程序
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 算子 - 行动
val rdd = sc.makeRDD(List(
"Hello", "Hive", "Spark", "Scala"
))
val s = new Search("S")
s.filterByQuery(rdd).foreach(println)
sc.stop()
}
// 加上case也不会报错,因为样例类中默认会添加很多方法,其中旧继承了序列化
class Search( q : String ) {
def filterByQuery( rdd : RDD[String] ): RDD[String] = {
// 算子外 -> Driver
// 算子内 -> Executor
val s : String = this.q; //
rdd.filter(_.startsWith(s) ) //这种方式不会报错,如果直接是_.startwith(q)报错
}
}
// 先讲一个scala语法,如果一个构造参数在一个普通方法中调用,那么这个参数会被编译为这个类的属性
class Test( name:String ) {
def test(): Unit = {
println(name) //this.name
}
}
Kryo序列化框架
RDD依赖关系 RDD血缘关系参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
注意:即使使用Kryo序列化,也要继承Serializable接口。
相邻两个RDD之间的关系,称之为依赖关系,多个连续的依赖关系称之为血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。相当于一个容错机制
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lines = sc.textFile("data/word.txt")
println(lines.toDebugString)
println("*******************************")
val words = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("*******************************")
val wordToOne = words.map((_,1))
println(wordToOne.toDebugString)
println("*******************************")
val wordToCount = wordToOne.reduceByKey(_+_)
println(wordToCount.toDebugString)
println("*******************************")
wordToCount.collect().foreach(println)
sc.stop()
}
=============================================
(3) data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
| data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
| data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
| data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) MapPartitionsRDD[3] at map at Test1.scala:20 []
| MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
| data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
| data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) ShuffledRDD[4] at reduceByKey at Test1.scala:24 []
+-(3) MapPartitionsRDD[3] at map at Test1.scala:20 []
| MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
| data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
| data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
依赖关系
依赖关系分为两类:
窄依赖(OneToOneDependency)
宽依赖(ShuffleDependency)
上游RDD的一个分区的数据被下游的RDD的一个分区所独享,称之为窄依赖 上游RDD的一个分区的数据被下游的RDD的多个分区所共享,称之为宽依赖,会打乱,shuffle
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lines = sc.textFile("data/word.txt")
println(lines.dependencies)
println("*******************************")
val words = lines.flatMap(_.split(" "))
println(words.dependencies)
println("*******************************")
val wordToOne = words.map((_,1))
println(wordToOne.dependencies)
println("*******************************")
val wordToCount = wordToOne.reduceByKey(_+_)
println(wordToCount.dependencies)
println("*******************************")
wordToCount.collect().foreach(println)
sc.stop()
}
==================================================
List(org.apache.spark.OneToOneDependency@18d910b3)
*******************************
List(org.apache.spark.OneToOneDependency@59dc36d4)
*******************************
List(org.apache.spark.OneToOneDependency@4d41ba0f)
*******************************
List(org.apache.spark.ShuffleDependency@2daf06fc)
*******************************
RDD持久化
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lines = sc.makeRDD(
List("Hadoop Hive Hbase", "Spark scala Java")
)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map(
t => {
println("*************************")
(t, 1)
}
)
// 设定数据持久化
// cache方法可以将血缘关系进行修改,添加一个和缓存相关的依赖关系
// cache操作不安全。
wordToOne.cache()
// 如果持久化的话,那么持久化的文件只能自己用。而且使用完毕后, 会删除
wordToOne.persist(StorageLevel.DISK_ONLY_2)
val wordToCount = wordToOne.reduceByKey(_+_)
println(wordToCount.toDebugString)
wordToCount.collect()//.foreach(println)
println("--------------------------------------------")
// val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
// rdd2.collect()
println(wordToCount.toDebugString)
sc.stop()
}
RDD检查点
所谓的检查点其实就是通过将RDD中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
sc.setCheckpointDir("cp") //设置检查点目录
val lines = sc.makeRDD(
List("Hadoop Hive Hbase", "Spark scala Java")
)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map(
t => {
println("*************************")
(t, 1)
}
)
// Spark可以将中间的计算结果保存到检查点中,让其他的应用使用数据
// Checkpoint directory has not been set in the SparkContext
wordToOne.checkpoint()
val wordToCount = wordToOne.reduceByKey(_+_)
println(wordToCount.toDebugString)
wordToCount.collect()//.foreach(println)
println("--------------------------------------------")
// val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
// rdd2.collect()
println(wordToCount.toDebugString)
sc.stop()
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
sc.setCheckpointDir("cp")
val lines = sc.makeRDD(
List("Hadoop Hive Hbase", "Spark scala Java")
)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map(
t => {
println("*************************")
(t, 1)
}
)
// Spark可以将中间的计算结果保存到检查点中,让其他的应用使用数据
// Checkpoint directory has not been set in the SparkContext
// 检查点可以切断血缘关系。
// 检查点为了数据的安全,会重新再执行一遍作业,所以会执行2次
// 为了解决这个问题,可以将检查点和缓存联合使用
wordToOne.cache()
wordToOne.checkpoint()
val wordToCount = wordToOne.reduceByKey(_+_)
// println(wordToCount.toDebugString)
wordToCount.collect()//.foreach(println)
println("--------------------------------------------")
val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
rdd2.collect()
//println(wordToCount.toDebugString)
sc.stop()
}
缓存和检查点的区别
1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。 2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。 3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。RDD分区器
Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。
自定义分区器
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lines = sc.makeRDD(
List(
("nba", "xxxxxx"),
("cba", "11111"),
("nba", "yyyyy"),
("wnba", "22222")
),2
)
val rdd2: RDD[(String, String)] = lines.partitionBy(new MyPartitioner())
rdd2.saveAsTextFile("output")
sc.stop()
}
// 自定义分区器
// 1. 继承Partitioner
// 2. 重写方法
class MyPartitioner extends Partitioner {
// TODO 分区数量
override def numPartitions: Int = {
3
}
// TODO 根据数据的key返回所在的分区编号,从0开始
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "cba" => 1
case "wnba" => 2
}
}
}
RDD文件读取与保存
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
//val rdd1: RDD[String] = sc.textFile("output")
//rdd1.collect().foreach(println)
//val rdd1 = sc.objectFile[(String, Int)]("output1") 泛型(原来存进去是什么类型)
//rdd1.collect().foreach(println)
val rdd2 = sc.sequenceFile[String, Int]("output2") 泛型
rdd2.collect().foreach(println)
sc.stop()
}
累加器
累加器:分布式共享只写变量
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(1,2,3,4),2
)
var sum = 0; // Driver
rdd.foreach(
num => {
sum = sum + num; // Executor
}
)
println(sum); // 0,这是Driver端的,发现Executor端的sum无法传回Driver
sc.stop()
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(1,2,3,4),2
)
// 1.创建累加器
val sum = sc.longAccumulator("sum")
//sc.collectionAccumulator()
//sc.doubleAccumulator()
rdd.foreach(
num => {
// 2.使用累计器
sum.add(num)
}
)
// 3.获取累加器的结果
println(sum.value);
sc.stop()
}
自定义累加器完成wordCount
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(
List(
"scala",
"scala",
"scala",
"scala",
"scala",
"scala",
"spark",
"spark",
"spark",
"spark"
)
)
// TODO 创建累加器
val acc = new WordCountAccumulator()
// TODO 向Spark进行注册
sc.register(acc, "wordCount")
rdd.foreach(
word => {
// TODO 将单词放入到累加器中
acc.add(word)
}
)
// TODO 获取累加器的累加结果
println(acc.value)
sc.stop()
}
// 自定义数据累加器
// 1. 继承AccumulatorV2
// 2. 定义泛型
// IN : String 输入类型
// OUT : Map[K, V] 输出类型
// 3. 重写方法(3 + 3)
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private val wcMap = mutable.Map[String,Int]()
// 判断累加器是否为初始状态
// copyAndReset must return a zero value copy
// TODO 3. true
override def isZero: Boolean = {
wcMap.isEmpty
}
// TODO 1.
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new WordCountAccumulator()
}
// 重置累加器
// TODO 2.
override def reset(): Unit = {
wcMap.clear()
}
// 从外部向累加器中添加数据
override def add(word: String): Unit = {
val oldCnt = wcMap.getOrElse(word, 0)
wcMap.update(word, oldCnt + 1)
}
// 合并两个累加器的结果
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
other.value.foreach {
case (word, cnt) => {
val oldCnt = wcMap.getOrElse(word, 0)
wcMap.update( word, oldCnt + cnt )
}
}
}
// 将结果返回到外部
override def value: mutable.Map[String, Int] = wcMap
}
广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(
List(
("a", 1), ("b", 2)
)
)
// val rdd2 = sc.makeRDD(
// List(
// ("a", 3), ("b", 4)
// )
// )
val map = mutable.Map[String, Int](
("a", 3), ("b", 4)
)
//包装成广播变量,这是提供一个结构
val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
val rdd3 = rdd1.map {
case ( word, cnt ) => {
//bcMap映射,但是存在性能问题,如果分区过多,存在大量的冗余,寻求一个共享的数据
val cnt2 = bcMap.value.getOrElse(word, 0)
(word, (cnt, cnt2))
}
}
rdd3.collect.foreach(println)
sc.stop()
}



