6、RDD的依赖关系
分为窄依赖(独生子)和宽依赖
-
窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用,窄依赖
我们形象的比喻为独生子女
-
宽依赖指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition,会引起
shuffle,总结:宽依赖我们形象的比喻为超生
7、行动算子
7.1 reduce
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("demo12")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
def reduce(f: (T, T) => T): T =
reduceByKey
def reduceByKey(f: (V, V) => V): RDD[(K,V)] =
val res: Int = rdd.reduce(_ + _)//1+2+3+4+5+6+7+8+9+10=55
println(res)//55
7.2 collect
通常在测试时用来观察RDD中的元素
// 2. collect // 通常在测试时用来观察RDD中的元素 //def collect(): Array[T] = val array = rdd.collect() println(util.Arrays.toString(array))
7.3 count
计数
// 3. count // 计算RDD中的元素个数并返回 val count = rdd.count() println(count)
7.4 first
//输出最大的数 val res4 = sc.makeRDD(List(1, 345, 6, 2346, 7453)) .sortBy(x => x, false)//降序排序 .first() println(res4)
7.5 take(n)
n 为取几个元素
val res5 = sc.makeRDD(List(1, 345, 6, 2346, 7453)) .sortBy(x => x, false) .take(1) println(res5.toList)
注意:take(1) 和 head 都是取第一个元素,但是不同的是take是行动算子,head是转换算子,并且,take 的返回值是数组,而head的返回值是 基本类型(int,long...)
7.6 takeOrdered(n)
n 为取几个元素,元素默认按照降序排序
val res6 = sc.makeRDD(List(1, 345, 6, 2346, 7453)) .takeOrdered(3) println(res6.toList) //List(1, 6, 345)
7.7 takeSample(withReplacement,num, [seed])
与sample类似,可以将RDD中的元素抽取并装入Array返回,takeSample会稳定抽取固定个数的元素
// 7. takeSample val rdd7 = sc.makeRDD(1 to 10) // def sample( // withReplacement: Boolean, // fraction: Double, // seed: Long = Utils.random.nextLong): RDD[T] = rdd7.sample(false, 0.5) // 与sample类似 可以将RDD中的元素抽取并装入Array返回 // takeSample会稳定抽取固定个数的元素 // def takeSample( // withReplacement: Boolean, // num: Int, // seed: Long = Utils.random.nextLong): Array[T] = rdd7.takeSample(true, 3) .foreach(println)
7.8 输出到文件
// rdd.saveAsTextFile("")
// rdd.saveAsObjectFile("")
// 只有二元组作为泛型的RDD才能保存成hadoop序列文件
// rdd.map(x => (x, x)).saveAsSequenceFile("")
7.9 countByKey
根据key值计数
7.10 foreach
打印输出
4、DAGDAG(Directed Acyclic Graph)叫做有向无环图,原始的 RDD 通过一系列的转换就就形成了 DAG,根据 RDD 之间的依赖关系的不同将 DAG 划分成不同的 Stage,对于窄依赖,partition 的转换处理在 Stage 中完成计算。对于宽依赖,由于有 Shuffle 的存在,只能在 parent RDD 处理完成后,才能开始接下来的计算,因此宽依赖是划分 Stage 的依据。
4.1. 任务分配
RDD 任务切分中间分为:Application、Job、Stage 和 Task
1)Application:初始化一个 SparkContext 即生成一个 Application
2)Job:一个 Action 算子就会生成一个 Job
3)Stage:根据 RDD 之间的依赖关系的不同将 Job 划分成不同的 Stage,遇到一个宽依赖
则划分一个 Stage。
4)Task:Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个
Task。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
错误-
执行Spark-shell或者Spark-submit时出现如下错误
21/07/02 10:55:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-
spark-env.sh中添加如下配置
export HADOOP_HOME=/opt/hadoop-2.7.7 export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
-
配置分发到集群
-
更新资源



