栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RDD编程-RDD算子的使用

RDD编程-RDD算子的使用

文章目录
  • 前言
  • 一、map
  • 二、mapPartition
  • 三、mapPartitionsWithIndex
  • 四、flatMap
  • 五、glom
  • 六、mapValues
  • 七、filter
  • 八、keyBy
  • 九、groupBy
  • 十、reduceByKey
  • 十一、foldByKey
  • 提示


前言

以下将会介绍常用RDD算子的使用介绍


一、map
    
    @Test def mapTest(): Unit = {
        // 1. 实例化一个集合
        val array: Array[String] = Array("dog", "cat", "elephent", "lion", "tiger", "monkey", "panda")
        // 2. 通过集合,创建RDD
        val rdd1: RDD[String] = sc.parallelize(array)
        // 3. 映射元素(需求:元素都替换成自己的长度)
        val rdd2: RDD[Int] = rdd1.map(_.length)
        // 4. 输出rdd2中描述的数据
        rdd2.foreach(println)

        // 5. 映射元素(需求:元素都替换成(元素, 长度))
        val rdd3: RDD[(String, Int)] = rdd1.map(word => (word, word.length))
        rdd3.foreach(println)
    }
二、mapPartition
    @Test def mapPartitionsTest(): Unit = {
        // 1. 实例化一个集合
        val array: Array[String] = Array("宋江", "卢俊义", "吴用", "公孙胜", "关胜")
        // 2. 创建一个RDD
        val rdd: RDD[String] = sc.parallelize(array, 2)//2->分区,将这个集合切分成两个分区
        // 3. 元素映射(参数是一个迭代器 iterator,返回值要求也是一个迭代器 iterator)
        val rdd1: RDD[Int] = rdd.mapPartitions(iterator => iterator.map(_.length))

        rdd1.foreach(println)
    }
三、mapPartitionsWithIndex
    @Test def mapPartitionsWithIndexTest(): Unit = {
        // 1. 通过集合,创建RDD
        val rdd: RDD[String] = sc.parallelize(Array("金莲小姐姐", "婆惜小姐姐", "三娘小姐姐", "师师小姐姐"), 3)
        // 2. 需求: 将原来的元素映射成 分区号+名字+名字的长度
        val rdd1: RDD[(Int, String, Int)] = rdd.mapPartitionsWithIndex((index, iterator) => iterator.map(str => (index, str, str.length)))

        rdd1.foreach(println)
    }
四、flatMap
    @Test def flatMapTest(): Unit = {
        // 1. 通过集合,创建RDD
        val rdd1: RDD[Array[String]] = sc.parallelize(Array(Array("赵云", "关羽", "张飞", "黄忠", "马超"), Array("张辽", "许褚", "典韦"), Array("陆逊", "张昭", "周瑜")))
        val rdd2: RDD[String] = rdd1.flatMap(_.iterator)  // _.toList
        rdd2.foreach(println)

        // 2. 通过集合,创建RDD
        val rdd3: RDD[String] = sc.parallelize(Array("赵云  关羽  张飞", "诸葛亮  司马懿  周瑜", "黄盖  陆逊"))
        val rdd4: RDD[String] = rdd3.flatMap(_.split(" +"))
        rdd4.foreach(println)

        // 3.
        val rdd5: RDD[Array[String]] = sc.parallelize(Array(Array("赵云  关羽   张飞", "诸葛亮 司马懿 周瑜"), Array("孙策  大乔", "周瑜  小乔")))
        // val rdd6: RDD[String] = rdd5.flatMap(array => array.flatMap(_.split(" +")))
        val rdd6: RDD[String] = rdd5.flatMap(_.flatMap(_.split(" +")))
        rdd6.foreach(println)


        // Array("a b c ", "d e f")  =>  Array ("a", "b", "c", "d", "e", "f")
    }

五、glom
    @Test def glomTest(): Unit = {
        // 1. 通过集合,创建RDD(将1到20的数字,分到了4个分区中)
        val rdd1: RDD[Int] = sc.parallelize(1 to 20, 4)
        // 2. 将每一个分区的元素做成一个数组
        val rdd2: RDD[Array[Int]] = rdd1.glom()

        rdd2.foreach(array => println(array.mkString(", ")))

        // 输出RDD中的分区数量
        println(rdd2.getNumPartitions)
    }
六、mapValues
    @Test def mapValues(): Unit = {
        // 1. 通过集合创建RDD
        val rdd1: RDD[String] = sc.parallelize(Array("贾宝玉", "林黛玉", "薛宝钗", "探春", "迎春", "惜春"))
        // 2. 对元素做映射,以名字的长度作为键,以名字作为值
        val rdd2: RDD[(Int, String)] = rdd1.map(n => (n.length, n))
        // 3. 让每一个人的名字后面添加一个叹号
        val rdd3: RDD[(Int, String)] = rdd2.mapValues(_ + "!")
        rdd3.foreach(println)

        //
        val array1: Array[String] = Array("贾宝玉", "林黛玉", "薛宝钗", "探春")
        val array2: Array[Int] = Array(18, 19, 17, 16)
        val pairs: Array[(String, Int)] = array1.zip(array2)
        val rdd4: RDD[(String, Int)] = sc.parallelize(pairs)
        // 需求: 让每一个人的年龄增20岁!
        val rdd5: RDD[(String, Int)] = rdd4.mapValues(_ + 20)
        rdd5.foreach(println)
    }
七、filter
    @Test def filterTest(): Unit = {
        // 1. 通过集合,创建RDD
        val rdd1: RDD[(String, Int)] = sc.parallelize(List(("贾宝玉", 18), ("林黛玉", 17), ("薛宝钗", 18), ("探春", 16)))
        // 2. 保留所有的成年的数据
        val rdd2: RDD[(String, Int)] = rdd1.filter(_._2 >= 18)

        rdd2.foreach(println)
    }
八、keyBy
    @Test def keyByTest(): Unit = {
        // 1. 通过集合,创建RDD
        val rdd1: RDD[String] = sc.parallelize(Array("张角", "张宝", "董卓", "貂蝉", "诸葛亮", "司马懿", "关云长", "张翼德", "诸葛武侯"))
        // 2. 给这些元素找键
        val rdd2: RDD[(Int, String)] = rdd1.keyBy(_.length)

        rdd2.foreach(println)
    }
九、groupBy
    @Test def groupByTest(): Unit = {
        // 1. 通过集合,创建RDD
        val rdd1: RDD[String] = sc.parallelize(Array("张角", "张宝", "董卓", "貂蝉", "诸葛亮", "司马懿", "关云长", "张翼德", "诸葛武侯"))
        // 2. 将所有的相同的长度的名字视为一个分组
        val rdd2: RDD[(Int, Iterable[String])] = rdd1.groupBy(_.length)

        // val rdd3: RDD[(Int, List[String])] = rdd2.mapValues(_.toList)
      //rdd2.foreach(x=>println(x))

        // 3. 遍历输出结果
        rdd2.foreach(tuple => println(s"${tuple._1} => ${tuple._2}"))
    }
十、reduceByKey
    @Test def reduceByKeyTest(): Unit = {
        // 1. 通过集合,创建一个RDD
        val rdd1: RDD[String] = sc.parallelize(Array("Tom  Jerry  Tom  Jerry", "Tom  Jerry  Tom  Jerry", "Hank  Hank  Tom  Jerry"))
        // 2. 计算wordcount
        val rdd2: RDD[String] = rdd1.flatMap(_.split(" +"))
        // 3. 以单词为键,1作为值,构成一个PairedRDD
        val rdd3: RDD[(String, Int)] = rdd2.map(n => (n, 1))
        // 4. 将相同的键视为一个分组,将值进行累加
        val rdd4: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)

        rdd4.foreach(println)
    }
十一、foldByKey
    
    @Test def foldByKeyTest(): Unit = {
        // 1. 通过集合,准备RDD
        val rdd1: RDD[String] = sc.parallelize(Array("三国演义", "水浒传", "红楼梦", "西游记", "诛仙", "神墓", "斗罗大陆", "斗破苍穹", "武动乾坤", "大主宰", "遮天"))
        // 2. 添加键
        // rdd1.map(n => (n.length, n))
        val rdd2: RDD[(Int, String)] = rdd1.keyBy(_.length)
      rdd2.foreach(println)

        val rdd3: RDD[(Int, String)] = rdd2.reduceByKey(_ + "," + _)
        rdd3.foreach(println)

        // 3.
        val rdd4: RDD[(Int, String)] = rdd2.foldByKey("书名: ")(_ + ", " + _)
        rdd4.foreach(println)
    }
提示

建议自己跑一下程序,感受一下细节之处。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/602111.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号