| Transformation | 描述 |
|---|---|
| map(func) | 对原RDD中的元素进行一一映射,返回新的RDD |
| filter(func) | 对原RDD过滤,当 func 为 true 时则保留该元素,返回新的RDD |
| flatMap(func) | 扁平化(降维)的 map,每个参数可映射到多个输出项,返回新的RDD |
| mapPartitions(func) | 对每个分区的数据单独进行 map,func 必须是 Iterator |
| mapPartitionsWithIndex(func) | 类似于 mapPartitions,但提供了一个表示分区数的整数值 |
| sample(withReplace, fraction, seed) | 使用给定的随机生成器的种子对数据进行采样 |
| union(otherDataset) | 将两个泛型相同的RDD合并(求并集),返回新的RDD |
| instersection(otherDataset) | 将两个泛型相同的RDD相交(求交集),返回新的RDD |
| distinct([numPartitions]) | 将原RDD去重,返回新的RDD |
| groupByKey([numPartitions]) | 对 (K, V) 进行调用,返回 (K, Iterator) 泛型的新RDD |
| reduceByKey(func, [numPartitions]) | 对 (K, V) 进行调用,将相同键的值进行合并,返回 (K, V) 泛型的新RDD |
| aggregateByKey(zeroValue)(SeqOP, combOp, [numPartitions]) | 对 (K, V) 进行调用,返回 (K, U) 泛型的RDD,其中每个键的值使用给定的组合函数和中性“零”值聚合 |
| sortByKey([ascending], [numPartitions]) | 实现对 (K, V) 按照 K 进行排序 |
| join(otherDataset, [numPartitions]) | 对泛型为 (K, V ) 和 (K, W) 的两个数据集,返回 (K, (V, W)) |
| cogroup(otherDataset, [numPartitions]) | 对泛型为 (K, V ) 和 (K, W) 的两个数据集,返回 (K, (Iterator, Iterable)) |
| cartesian(otherDataSet) | 当调用泛型为 T 和 U 的数据集时,返回一个 (T, U) 对(所有元素对)的数据集。 |
| pipe(command, [envVars]) | 通过 shell 命令来管理管道 RDD 的每个分区 |
| coalesce(numPartitions) | 将 RDD 中的分区减少到指定值 |
| repartition(numPartitions) | 重新设置分区数,通过网络shuffle打乱数据重新分区 |
| repartitionAndSortWithinPartitions(partitioner) | 根据指定的分区数器对RDD重新分区,并在每个结果分区内,按键对记录进行排序 |
键值对泛型的 RDD 可以通过 partitionBy 指定分区器。
Spark 默认实现了两种分区器:HashPartitioner、RangePartitioner,也可以自定义分区器。
- HashPartitioner:根据 key 的 hashCode 返回值对分区数取模
- 优势:可以将相同 key 的元素分到同一分区,方便 byKey 的操作
- 劣势:如果某些相同 key 的元素较多,容易造成数据倾斜
- RangePartitioner:使用抽样方法,随机抽样并轮询分发数据到不容的分区
- 优势:发分区后每个RDD中的元素数量相差无几
- 劣势:会将数据打乱,如需 byBey 操作会重新进行 shuffle
自定义分区器:
- 使用匿名类的方式自定义分区器
rdd.map(x => (x,x))
.partitionBy(new Partitioner {
// 设置分区数量
override def numPartitions: Int = 2
// 根据 key 计算出分区编号
override def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2
})
- 单独构造类的方式自定义分区器
- 继承 Partitioner 抽象类
- 重写其中的 numPartitions、getPartition 分发
rdd.map(x => (x, x))
.partitionBy(new DefinePartition(2))
class DefinePartition(num: Int) extends Partitioner {
override def numPartitions: Int = num
override def getPartition(key: Any): Int = key.asInstanceOf[Int] % 2
}
1.2 ByKey使用 mapPartitionsWithIndex 方法,还能获取分区编号
在上面可以看到很多 ByKey 的算子,这些算子都适配于 (K, V) 泛型的 RDD,此类算子底层都是基于 combineByKeyWithClassTag 实现。
combineByKeyWithClassTag 中的 WithClassTag 相当于一种泛型检测机制,在该算子之上有一个简单的继承,名为combineByKey。
def combineByKey[C](
createCombiner: V => C,
mergevalue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergevalue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
combineByKey 还是在调用 combineByKeyWithClassTag,为了方便,看此方法就可以了!
combineByKey 有3个重要的参数:
- createCombiner: V => C:拿到第一个value,创建一个(任意类型的)聚合器对象
- mergevalue: (C, V) => C:将 value 与上一次计算完的 C 进行合并,如果拿到 value 没有聚合器,则先进行 createCombiner
- mergeCombiners: (C, C) => C:将多个 C 进行合并
例子:
rdd = sc.makeRDD(Seq(1, 1, 1, 2, 2, 3))
rdd.map(x => (x, 1)) // 此时 value 的类型为 Int
.combineByKey[Double](
(V: Int) => V.toDouble,
(C: Double, V: Int) => C + V,
(C1: Double, C2: Double) => C1 + C2)
.foreach(println)
// 输出: 可以看到 value 的类型变为了 Double (1,3.0) (2,2.0) (3,1.0)
combineByKey 与 reduceByKey 的区别是前者聚合完可以改变 value 的类型,而后者不行;reduceByKey 实则就是调用 combineByKey,只不过不改变 V 的类型!如下源码:
def reduceByKey(
partitioner: Partitioner,
func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) // value 的类型并没有改变
}
2. RDD Actions
| Action | 描述 |
|---|---|
| reduce(func) | 利用函数 func 将数据进行聚合 |
| collect() | 对数据集中的元素收集并返回 |
| count() | 统计数据集中元素的数量 |
| first() | 返回数据集中的第一个元素 |
| take(n) | 返回数据集中前n个元素 |
| takeSample(withReplacement, num, [seed]) | 对数据集进行随机取样,返回新的RDD |
| takeOrdered(n, [ordering]) | 使用自然排序或自定义排序数据集中的前n个元素 |
| saveAsTextFile(path) | 将数据集中的元素保存在文本中 |
| saveAsObjectFile(path) | 使用 Java 序列化将元素保存在文件中 |
| saveAsSquenceFile(path) | 使用 Hadoop SequenceFile 写入本地文件系统 |
| countByKey() | 适用于 (K, V) 泛型的数据集,对数据集按照 K 进行计数,返回 (K, Int) |
| foreach(func) | 通常用于遍历元素,会对数据集中的每一个元素进行 func |
❤️ END ❤️



