只做记录不展示结果(部分结果放在了代码的注释中):
package spark_rdd
import org.apache.spark.sql.SparkSession
object rdd_five extends App{
override def main(args: Array[String]): Unit = {
val p = "-----"*20
val spark = SparkSession.builder.appName("five").master("local[*]").getOrCreate()
val sc = spark.sparkContext
val rdd_1 = sc.parallelize(Seq("hadoop","java","scala","python","spark","linux","scala","python"))
println(rdd_1.getClass())
println(p)//class org.apache.spark.rdd.ParallelCollectionRDD
//创建键值对RDD
val rdd_1_1 = rdd_1.map(x=>(x,1))
println(rdd_1_1.getClass())
println(p)//class org.apache.spark.rdd.MapPartitionsRDD
println(rdd_1_1.collect().foreach(x=>print(x)))
println(p)
//reduceByKey(func)合并具有相同键的值
rdd_1_1.reduceByKey((x,y)=>x+y).foreach(x=>println(x))
println(p)
//groupByKey()对具有相同键的值进行分组
rdd_1_1.groupByKey().foreach(x=>println(x))
print(p)
// combineByKey( createCombiner, mergevalue, mergeCombiners, partitioner)
// 使用不同的返回类型合并具有相同键的值
//mapValues(func)对pair RDD 中的每个值应用一个函数而不改变键
rdd_1_1.mapValues(x=>x+2).foreach(println)
println(p)
//flatMapValues(func)对pair RDD
// 中的每个值应用一个返回迭代器的函数,
// 然后对返回的每个元素都生成一个对应原键的键值对记录
rdd_1_1.flatMapValues(x=>(x to 3)).foreach(println)
println(p)
//返回一个仅包含键的RDD rdd.keys()
rdd_1_1.keys.collect().foreach(println)
println(p)
//返回一个仅包含值的RDD rdd.values()
rdd_1_1.values.collect().foreach(println)//8行1
println(p)
//返回一个根据键排序的RDD rdd.sortByKey()
val rdd_2 = sc.parallelize(Seq(("hadoop",1),("Spark",2),("Java",3),("pyspark",2)))
rdd_2.sortByKey().collect().foreach(println)
println(p)
//针对连个键值对RDD的操作
//subtractByKey删掉 RDD 中键与 other RDD 中的键相同的元素
val rdd_a = sc.parallelize(Seq((1, 2),(3, 4),(3, 6)))
val rdd_b = sc.parallelize(Seq((3,9)))
rdd_a.subtractByKey(rdd_b).foreach(println)//(1,2)
println(p)
//join对两个 RDD 进行内连接
rdd_a.join(rdd_b).foreach(println)
println(p)
//rightOuterJoin对两个 RDD 进行连接操作,确保第一个 RDD 的键必须存在(右外连接)
rdd_a.rightOuterJoin(rdd_b).foreach(println)
println(p)
//leftOuterJoin对两个 RDD 进行连接操作,确保第二个 RDD 的键必须存在(左外连接)
rdd_a.leftOuterJoin(rdd_b).foreach(println)
println(p)
//cogroup将两个 RDD 中拥有相同键的数据分组到一起
rdd_a.cogroup(rdd_b).foreach(println)
println(p)
}
}



