算子使用
package SparkTest
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkCore {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]")
val sparkContext = new SparkContext(conf)
val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rddData: RDD[Int] = sparkContext.parallelize(array, 3)
// sample 算子
// 第一个参数为抽取后是否放回, true 放回, false 不放回
// 第二个参数是抽样比例,但是在量小的时候不一定准,需多次抽样之后的平均量值接近该比例
// 第三个参数为抽样种子 可用可不用,一般设置之后,若数据源不变,那么后续每次随机抽取的结果固定,若不设置该值,那么每次随机抽取的结果不一定
val sample: RDD[Int] = rddData.sample(true, 0.4)
println("sample 算子示例==================================================== : n" + sample.collect().toBuffer)
//union 算子 并集
val union: RDD[Int] = rddData.union(sample)
println("union 算子示例==================================================== : n" + union.collect().toBuffer)
//intersection 交集
val intersection: RDD[Int] = rddData.intersection(sample)
println("intersection 算子示例==================================================== : n" + intersection.collect().toBuffer)
// distinct 去重
val distinct: RDD[Int] = union.distinct(3)
println("distinct 算子示例==================================================== : n" + distinct.collect().toBuffer)
//groupByKey 算子输入元素必须为 k[T] v[T1] 对,返回类型为 (k[T],Iterable[T1])
//Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好.
val groupByKey: RDD[(Int, Iterable[String])] = union.map(tp => (tp, tp + "_tp")).groupByKey()
println("groupByKey 算子示例==================================================== : n" + groupByKey.collect().toBuffer)
//reduceByKey 算子 map 端的预聚合方式无法控制,但是可以操作最后 reduce 端的聚合方式
val reduceByKey: RDD[(Int, String)] = union.map(tp => (tp, tp + "_tp")).reduceByKey((v1, v2) => {
(v1.split("_", -1)(0).toInt + v2.split("_", -1)(0).toInt).toString
})
println("reduceByKey 算子示例==================================================== : n" + reduceByKey.collect().toBuffer)
//aggregateByKey 算子 map 端的预聚合方式以及最后 reduce 端的聚合方式均可控制,且是在给定的初始值基础之上进行
val agg: RDD[(String, Int)] = sparkContext.parallelize(Array(("a", 1), ("a", 2), ("a", 3), ("c", 1), ("c", 2), ("c", 3), ("b", 6), ("b", 8), ("b", 10)), 3)
val aggregateByKey: RDD[(String, Int)] = agg.aggregateByKey(2)(_ * _, _ + _)
println("aggregateByKey 算子示例==================================================== : n" + aggregateByKey.collect().toBuffer)
//三者区别
// 从上面的区别上,我们可以看出,在数据量少的数据集,可以使用groupByKey,但对于数据量大的集,使用groupByKey的话,会有大量的数据经过shuffle过程,占用大量的内存,所以,我们在代码要尽量避免使用groupByKey,用reduceByKey代替。
// reduceByKey可以认为是aggregateByKey的简化版
// aggregateByKey,分为三个参数,,多提供了一个函数,Seq Function
// 就是说自己可以控制如何对每个partition中的数据进行先聚合,类似于mapreduce中的,map-side combine 然后才是对所有partition中的数据进行全局聚合
// sortByKey 算子 true 升序, false 降序
val sortByKey: RDD[(String, Int)] = agg.sortByKey(false)
println("sortByKey 算子示例==================================================== : n" + sortByKey.collect().toBuffer)
// join 算子
val join1: RDD[(Int, Int)] = sample.map(tmp => (tmp, 3))
val j2: RDD[(Int, Int)] = rddData.map(t => (t, 2))
val leftOuterJoin: RDD[(Int, (Int, Option[Int]))] = j2.leftOuterJoin(join1).filter(_._2._2.nonEmpty)
println("leftOuterJoin 算子示例==================================================== : n" + leftOuterJoin.collect().toBuffer)
//cogroup 算子 类似于 join 算子,但是 join 算子是炸开每一条,而 cogroup 是预先将各自算子里的相同 key 聚合为一个迭代器,然后再 join
//也有点类似于 groupByKey 但不同于 groupByKey 的是传入的参数个数以及方式不同
val nameClass: RDD[(String, String)] = sparkContext.parallelize(Array(("xiaoming", "一班"), ("小红", "二班"), ("xiaolan", "三班"), ("小绿", "四班")))
val nameScore: RDD[(String, String)] = sparkContext.parallelize(Array(("xiaoming", "75"), ("小绿", "82"), ("xiaolan", "90"), ("小红", "100"), ("小红", "0")))
val scoreResult: RDD[(String, (Iterable[String], Iterable[String]))] = nameClass.cogroup(nameScore)
println("cogroup 算子示例==================================================== : n" + scoreResult.collect().toBuffer)
//cartesian 算子 笛卡儿积 : 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。
val cartesian: RDD[(Int, Int)] = rddData.cartesian(sample)
println("cartesian 算子示例==================================================== : n" + cartesian.collect().toBuffer)
// coalesce 算子 : Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。
// repartition 函数其实就是coalesce函数第二个参数为true的实现
//查看 rddData 每个分区号对应的值
println("coalesce 算子示例==================================================== : n")
val mapPartitionsWithIndex: RDD[Int] = rddData.mapPartitionsWithIndex {
(index, ite) => {
println(index + "t" + ite.toList)
}
ite
}
println(mapPartitionsWithIndex.collect().toBuffer)
val coalesceRdd: RDD[Int] = rddData.coalesce(2, false)
val coalesce: RDD[Int] = coalesceRdd.mapPartitionsWithIndex {
(index, ite) => {
println(index + "t" + ite.toList)
}
ite
}
println(coalesce.collect().toBuffer)
// repartition 算子 : Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。
val repartition: RDD[Int] = rddData.repartition(5)
println("repartition 算子示例==================================================== : n" + repartition.collect().toBuffer)
//action 算子=====================================================================================================
//reduce 算子 : 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素
val reduceRDD: Int = rddData.reduce(_ * _)
println("reduceRDD 算子示例==================================================== : n" + reduceRDD)
//collect 算子:在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素,这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。
val collectArr: Array[Int] = rddData.collect()
println("collectArr 算子示例==================================================== : n" + collectArr.toBuffer)
//count 算子: 返回 dataset 中元素的个数。
println("count 算子示例==================================================== : n" + rddData.count())
//first : 返回 dataset 中的第一个元素(类似于 take(1)。
//take : 将数据集中的前 n 个元素作为一个 array 数组返回。
val takeArr: Array[Int] = rddData.take(3)
println("take 算子示例==================================================== : n" + takeArr.toBuffer)
//takeSample : 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。
val takeSampleArr: Array[Int] = rddData.takeSample(true, 3)
println("takeSample 算子示例==================================================== : n" + takeSampleArr.toBuffer)
//takeOrdered : 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。
val takeOrderedArr: Array[Int] = rddData.takeOrdered(10)
println("takeOrdered 算子示例==================================================== : n" + takeOrderedArr.toBuffer)
//saveAsTextFile : 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。
rddData.saveAsTextFile("F:\shuzilm\document\Source--ng\demandFile\saveAsTextFile_test.txt")
//countByKey : 仅适用于(K,V)类型的 RDD。返回具有每个 key 的计数的(K , Int)pairs 的 hashmap。
val wordRdd: RDD[(String, String)] = sparkContext.textFile("F:\shuzilm\document\Source--ng\demandFile\Rdd_test.txt").flatMap(_.split(" ")).map(word => (word, "value"))
val countByKey: collection.Map[String, Long] = wordRdd.countByKey()
println("countByKey 算子示例==================================================== : n" + countByKey)
sparkContext.stop()
}
}
结果参考
sample 算子示例==================================================== :
ArrayBuffer(1, 4, 8, 10)
union 算子示例==================================================== :
ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 4, 8, 10)
intersection 算子示例==================================================== :
ArrayBuffer(4, 1, 10, 8)
distinct 算子示例==================================================== :
ArrayBuffer(6, 3, 9, 4, 1, 7, 10, 8, 5, 2)
groupByKey 算子示例==================================================== :
ArrayBuffer((6,CompactBuffer(6_tp)), (1,CompactBuffer(1_tp, 1_tp)), (7,CompactBuffer(7_tp)), (8,CompactBuffer(8_tp, 8_tp)), (2,CompactBuffer(2_tp)), (3,CompactBuffer(3_tp)), (9,CompactBuffer(9_tp)), (4,CompactBuffer(4_tp, 4_tp)), (10,CompactBuffer(10_tp, 10_tp)), (5,CompactBuffer(5_tp)))
reduceByKey 算子示例==================================================== :
ArrayBuffer((6,6_tp), (1,2), (7,7_tp), (8,16), (2,2_tp), (3,3_tp), (9,9_tp), (4,8), (10,20), (5,5_tp))
aggregateByKey 算子示例==================================================== :
ArrayBuffer((c,12), (a,12), (b,960))
sortByKey 算子示例==================================================== :
ArrayBuffer((c,1), (c,2), (c,3), (b,6), (b,8), (b,10), (a,1), (a,2), (a,3))
leftOuterJoin 算子示例==================================================== :
ArrayBuffer((4,(2,Some(3))), (1,(2,Some(3))), (10,(2,Some(3))), (8,(2,Some(3))))
cogroup 算子示例==================================================== :
ArrayBuffer((小绿,(CompactBuffer(四班),CompactBuffer(82))), (xiaoming,(CompactBuffer(一班),CompactBuffer(75))), (xiaolan,(CompactBuffer(三班),CompactBuffer(90))), (小红,(CompactBuffer(二班),CompactBuffer(100, 0))))
cartesian 算子示例==================================================== :
ArrayBuffer((1,1), (2,1), (3,1), (1,4), (2,4), (3,4), (1,8), (1,10), (2,8), (2,10), (3,8), (3,10), (4,1), (5,1), (6,1), (4,4), (5,4), (6,4), (4,8), (4,10), (5,8), (5,10), (6,8), (6,10), (7,1), (8,1), (9,1), (10,1), (7,4), (8,4), (9,4), (10,4), (7,8), (7,10), (8,8), (8,10), (9,8), (9,10), (10,8), (10,10))
coalesce 算子示例==================================================== :
0 List(1, 2, 3)
1 List(4, 5, 6)
2 List(7, 8, 9, 10)
ArrayBuffer()
0 List(1, 2, 3)
1 List(4, 5, 6, 7, 8, 9, 10)
ArrayBuffer()
repartition 算子示例==================================================== :
ArrayBuffer(6, 10, 1, 2, 7, 3, 4, 8, 5, 9)
reduceRDD 算子示例==================================================== :
3628800
collectArr 算子示例==================================================== :
ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
count 算子示例==================================================== :
10
take 算子示例==================================================== :
ArrayBuffer(1, 2, 3)
takeSample 算子示例==================================================== :
ArrayBuffer(4, 5, 3)
takeOrdered 算子示例==================================================== :
ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
countByKey 算子示例==================================================== :
Map(12 -> 9, 李大四 -> 3, 男 -> 12, 王芳 -> 3, 19 -> 6, 张大三 -> 3, 李四 -> 3, 50 -> 6, 13 -> 9, 女 -> 6, english -> 6, 80 -> 1, math -> 6, 25 -> 6, 60 -> 4, 20 -> 6, 70 -> 7, 王小芳 -> 3, 张三 -> 3, chinese -> 6)



