* reduce
@Test def reduceTest(): Unit = {
// 1. 通过集合,创建一个RDD
val rdd: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合运算
val res: Int = rdd.reduce(_ + _)
// 3. 输出结果
println(res)
}
* fold
@Test def foldTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100, 2)
val res: Int = rdd.fold(10)(_ + _)
println(res)
}
* aggregate
@Test def aggregateTest(): Unit = {
// 1. 准备一个集合
val rdd1: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合
val res: Int = rdd1.aggregate(80)(Math.max, _ + _)
println(res)
}
* collect
@Test def aggregateTest(): Unit = {
// 1. 准备一个集合
val rdd1: RDD[Int] = sc.parallelize(1 to 100, 2)
// 2. 聚合
val res: Int = rdd1.aggregate(80)(Math.max, _ + _)
println(res)
}
* collectAsMap
@Test def collectAsMap(): Unit = {
val rdd: RDD[(Int, String)] = sc.parallelize(Array("Lily", "Uncle Wang", "Polly")).keyBy(_.length)
val res: collection.Map[Int, String] = rdd.collectAsMap()
* count
@Test def countTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100)
val count: Long = rdd.count()
println(count)
}
* countByKey
@Test def countByKeyTest(): Unit = {
val rdd: RDD[(Int, String)] = sc.parallelize(Array("Jim", "Tom", "Lily", "Lucy", "Polly", "Snoppy")).keyBy(_.length)
val res: collection.Map[Int, Long] = rdd.countByKey()
println(res)
}
* take
@Test def takeTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7))
val res: Array[Int] = rdd.take(3)
res.foreach(println)
}
* takeSample
@Test def takeSample(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100)
val res: Array[Int] = rdd.takeSample(withReplacement = true, 10)
res.foreach(println)
}
* takeOrdered
@Test def takeOrdered(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
val res: Array[Int] = rdd.takeOrdered(5)
res.foreach(println)
}
* top
@Test def top(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
val res: Array[Int] = rdd.top(5)
res.foreach(println)
}
* first
@Test def first(): Unit = {
val res: Int = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2)).first()
println(res)
}
* foreach
@Test def top(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Array(1, 3, 5, 7, 9, 0, 8, 6, 4, 2))
//val res: Array[Int] = rdd.top(5)
rdd.foreach(println)
}
* saveAsTextFile
@Test def saveTest(): Unit = {
val rdd: RDD[Int] = sc.parallelize(1 to 100, 4)
rdd.saveAsTextFile("file/spark/out")
}