6.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object _06TestAggregateByKey_exercise {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd1: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
val result: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))(
(x, y) => (x._1 + 1, x._2 + y),
(x, y) => (x._1 + y._1, x._2 + y._2)
)
//继续求平均值
val result1: RDD[(String, Double)] = result.map(x => {
var t = x._2
var avg = t._2 / t._1.toDouble
(x._1, avg)
})
result1.collect().foreach(println)
// (b,4.0)
//(a,1.6666666666666667)
}
}
7.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD//作用: 将kv对形式的RDD的v映射成别的类型
object _07MapValueDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
//需求,按key先分组 ,再求和
val value: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val value1: RDD[(String, Int)] = value.mapValues(_.sum)
value1.collect().foreach(println)
//(b,12)
//(a,5)println("*****************")
//将rdd*10进行输出
val value2: RDD[(String, Int)] = rdd.mapValues(_ * 10)
value2.collect().foreach(println)
//(a,10)
//(a,20)
//(b,30)
//(a,20)
//(b,40)
//(b,50)
}}
8.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
// * 第一个参数:就是第一个value的转换操作,使之当成默认值
// * 第二个参数:用于指定分区内的计算逻辑:
// * 第三个参数:用于指定分区间的计算逻辑
object _08ConbineByKeyDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
// 由于第一个参数是一个函数,而不是一个普通的值,因此对于其他两个参数来说,是动态获取的,那么应该指定一下
val result: RDD[(String, Int)] = rdd.combineByKey(x => x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
result.collect().foreach(println)
sc.stop()
// (b,8)
//(a,4)
}
}
9.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object _09ReduceAggregateFlodCombineByKeyDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("a", 2), ("b", 4), ("b", 5)), 2)
rdd.reduceByKey(_+_)
rdd.aggregateByKey(0)(_+_,_+_)
rdd.foldByKey(0)(_+_)
val value: RDD[(String, Int)] = rdd.combineByKey(x=>x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
value.collect().foreach(println)//(b,8)
//(a,4)
}
}
10.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object _10JoinDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)
val value1: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val value2: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
val value3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
value1.collect().foreach(println)
println("----------------------------")
value2.collect().foreach(println)
println("----------------------------")
value3.collect().foreach(println)//(b,(3,6))
//(b,(3,7))
//(a,(1,5))
//(a,(2,5))
//(c,(4,8))
//----------------------------
//(d,(5,None))
//(b,(3,Some(6)))
//(b,(3,Some(7)))
//(a,(1,Some(5)))
//(a,(2,Some(5)))
//(c,(4,Some(8)))
//----------------------------
//(b,(Some(3),6))
//(b,(Some(3),7))
//(e,(None,9))
//(a,(Some(1),5))
//(a,(Some(2),5))
//(c,(Some(4),8))
}
}
11.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object _11CogroupDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
value.collect().foreach(println)
// (d,(CompactBuffer(5),CompactBuffer()))
//(b,(CompactBuffer(3),CompactBuffer(6, 7)))
//(e,(CompactBuffer(),CompactBuffer(9)))
//(a,(CompactBuffer(1, 2),CompactBuffer(5)))
//(c,(CompactBuffer(4),CompactBuffer(8)))
}
}



