栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark KV类型算子案例详解二

Spark KV类型算子案例详解二

6.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _06TestAggregateByKey_exercise {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("test").setMaster("local[*]")
        val sc = new SparkContext(conf)
       
        val rdd1: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)

       
        val result: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))(
            (x, y) => (x._1 + 1, x._2 + y),
            (x, y) => (x._1 + y._1, x._2 + y._2)
        )
        //继续求平均值
        val result1: RDD[(String, Double)] = result.map(x => {
            var t = x._2
            var avg = t._2 / t._1.toDouble
            (x._1, avg)
        })


        result1.collect().foreach(println)
        // (b,4.0)
        //(a,1.6666666666666667)
    }
}


7.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//作用: 将kv对形式的RDD的v映射成别的类型
object _07MapValueDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("test").setMaster("local[*]")
        val sc = new SparkContext(conf)

        val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)

        //需求,按key先分组 ,再求和
        val value: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        val value1: RDD[(String, Int)] = value.mapValues(_.sum)
        value1.collect().foreach(println)
        //(b,12)
        //(a,5)

        println("*****************")
        //将rdd*10进行输出
        val value2: RDD[(String, Int)] = rdd.mapValues(_ * 10)
        value2.collect().foreach(println)
        //(a,10)
        //(a,20)
        //(b,30)
        //(a,20)
        //(b,40)
        //(b,50)
    }

}


8.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//         *   第一个参数:就是第一个value的转换操作,使之当成默认值
//         *   第二个参数:用于指定分区内的计算逻辑:
//         *   第三个参数:用于指定分区间的计算逻辑
object _08ConbineByKeyDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("test").setMaster("local[*]")
        val sc = new SparkContext(conf)

        val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
        //  由于第一个参数是一个函数,而不是一个普通的值,因此对于其他两个参数来说,是动态获取的,那么应该指定一下
        val result: RDD[(String, Int)] = rdd.combineByKey(x => x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
        result.collect().foreach(println)
        sc.stop()
        // (b,8)
        //(a,4)


    }
}


9.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _09ReduceAggregateFlodCombineByKeyDemo {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("test").setMaster("local[*]")
            val sc = new SparkContext(conf)

            val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("a", 2), ("b", 4), ("b", 5)), 2)

            rdd.reduceByKey(_+_)
            rdd.aggregateByKey(0)(_+_,_+_)
            rdd.foldByKey(0)(_+_)
            val value: RDD[(String, Int)] = rdd.combineByKey(x=>x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
            value.collect().foreach(println)

            //(b,8)
            //(a,4)
        }
    }


10.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _10JoinDemo {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("test").setMaster("local[*]")
            val sc = new SparkContext(conf)

            val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
            val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)
           
            val value1: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
            val value2: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
            val value3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
            value1.collect().foreach(println)
            println("----------------------------")
            value2.collect().foreach(println)
            println("----------------------------")
            value3.collect().foreach(println)

            //(b,(3,6))
            //(b,(3,7))
            //(a,(1,5))
            //(a,(2,5))
            //(c,(4,8))
            //----------------------------
            //(d,(5,None))
            //(b,(3,Some(6)))
            //(b,(3,Some(7)))
            //(a,(1,Some(5)))
            //(a,(2,Some(5)))
            //(c,(4,Some(8)))
            //----------------------------
            //(b,(Some(3),6))
            //(b,(Some(3),7))
            //(e,(None,9))
            //(a,(Some(1),5))
            //(a,(Some(2),5))
            //(c,(Some(4),8))
        }
    }

11.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _11CogroupDemo {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("test").setMaster("local[*]")
            val sc = new SparkContext(conf)

            val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
            val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)

           
            val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
            value.collect().foreach(println)
            // (d,(CompactBuffer(5),CompactBuffer()))
            //(b,(CompactBuffer(3),CompactBuffer(6, 7)))
            //(e,(CompactBuffer(),CompactBuffer(9)))
            //(a,(CompactBuffer(1, 2),CompactBuffer(5)))
            //(c,(CompactBuffer(4),CompactBuffer(8)))
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342739.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号