栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据开发之Spark常用RDD算子

大数据开发之Spark常用RDD算子

文章目录
      • 大数据开发之Spark常用RDD算子
        • map
        • flatMap
        • mapPartitions和mapPartitionsWithIndex
        • filter
        • sample
        • union
        • join
        • groupByKey
        • sort,sortBykey
        • MapValues
      • 常用操作算子

大数据开发之Spark常用RDD算子 map

map传入一条数据,返回一条数据
map是对RDD中元素逐一进行函数操作映射为另外一个RDD,
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区

object DemoMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("DemoMap")
      .setMaster("local")

    val sc = new SparkContext(conf)

    val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
    listRDD.map(i=>i+1)//对List集合每个元素进行加1操作
      .foreach(println)
  }
}

flatMap

flatMap简单来说就是传入一条返回N条
flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。
flatMap会自动将结果进行扁平化处理(展开)

object DemoFlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoFlatMap")
    val sc = new SparkContext(conf)
    val lineRDD = sc.parallelize(List("java,scala,python", "hadoop,hbase,hive"))
    lineRDD.map(line=>line.split(",")).foreach(line=>println(line.mkString(",")))
    lineRDD.flatMap(line=>line.split(",")).foreach(println)
  }

}

lineRDD.map(line=>line.split(").foreach(println)如果写成这样只输出的是地址值
lineRDD.map(line=>line.split(",")).foreach(line=>println(line.mkString(",")))输出打印为

lineRDD.flatMap(line=>line.split(",")).foreach(println)输出为

flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。

mapPartitions和mapPartitionsWithIndex
区于foreachPartition(属于Action,且无返回值),而mapPartitions可获取返回值。与map的区别前面已经提到过了,但由于单
独运行于RDD的每个分区上(block),所以在一个类型为T的RDD上运行
时,(function)必须是Iterator => Iterator类型的方法(入参)

与mapPartitions类似,但需要提供一个表示分区索引值的整型值作为参
数,因此function必须是(int, Iterator)=>Iterator类型的
object DemoMapPartitions {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoMapPartition")
    val sc = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/student.txt", 6)
    println(stuRDD.getNumPartitions)//查看有多少个分区
    // take 也是一个action算子 会返回一个Array
    // 这里的foreach实际上是Array的方法 不是RDD的算子
    stuRDD.take(10).foreach(println)
    
    //按分区去处理数据
    stuRDD.mapPartitions(rdd=>rdd.map(_.split(",")(1)))foreach(println)
    
    //处理每个分区的时候获得一个index
    stuRDD.mapPartitionsWithIndex((index,rdd)=>{
      println("当前遍历的分区:"+index)
      rdd.map(line=>line.split(",")(1))
    }).foreach(println)
  }
}
filter
object DemoFilter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoFilter")
    val sc = new SparkContext(conf)
    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
    listRDD.filter(i=>i%2==0).foreach(println)//过滤出偶数
    listRDD.filter(_%2==1)//等价于上面,就是个简写
  }
}
sample

采样操作,用于从样本中取出部分数据。withReplacement是否放回,fraction采样比例,seed用于指定的随机数生成器的种子。(是否放回抽样分true和false,fraction取样比例为(0, 1]。seed种子为整型实数

object DemoSample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoSample")
    val sc = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/student.txt")
    val sm = stuRDD.sample(withReplacement = false, fraction = 0.1)
    sm.foreach(println)
  }

}
union

对于源数据集和其他数据集求并集,不去重。

object DemoUnion {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoUnion")
    val sc= new SparkContext(conf)
    val listRDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
    println(listRDD1.getNumPartitions)
    val listRDD2: RDD[Int] = sc.parallelize(List(8, 9, 10))
    println(listRDD2.getNumPartitions)

    listRDD1.union(listRDD2).foreach(println)
  }
}
join

join加入一个RDD,在一个(k,v)和(k,w)类型的dataSet上调用,返回一个(k,(v,w))的pair dataSet。
leftOuterJoin,rightOuterJoin…都类似MySQL中的连接,不在多说

object DemoJoin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoJoin")
    val sc = new SparkContext(conf)
    // 构建K-V格式的RDD
    val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明"))
    val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))
    val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))

    tuple2RDD1.join(tuple2RDD2).map{
      case(id:String,(name:String,age:Int))=>id+","+name+","+age
    }.foreach(println)//等价于下面这个写法

    tuple2RDD1.join(tuple2RDD2).map(kv => {
      val i: String = kv._1
      val j: String = kv._2._1
      val k: Int = kv._2._2
      i + "," + j + "," + k
    }).foreach(println)


    //leftOuterJoin
    tuple2RDD1.leftOuterJoin(tuple2RDD3)
      .map{
        // 关联上的处理逻辑
        case (id: String, (name: String, Some(gender))) =>
          id + "," + name + "," + gender
        // 未关联上的处理逻辑
        case (id: String, (name: String, None)) =>
          id + "," + name + "," + "-"
      }.foreach(println)
  }
}
groupByKey

在一个PairRDD或(k,v)RDD上调用,返回一个(k,Iterable)。主要作用是将相同的所有的键值对分组到一个集合序列当中,其顺序是不确定的。groupByKey是把所有的键值对集合都加载到内存中存储计算,若一个键对应值太多,则易导致内存溢出

object DemoGroupByKey {
  def main(args: Array[String]): Unit = {
    

    // 统计班级人数
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("DemoGroupByKey")

    val sc: SparkContext = new SparkContext(conf)

    // 读取学生数据构建RDD
    val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    stuRDD.groupBy(line => line.split(",")(4))
      .map(kv => (kv._1, kv._2.size))
      .foreach(println)

    
    stuRDD.map(line => (line.split(",")(4), line))
      .groupByKey()
      .map(kv => (kv._1, kv._2.size))
      .foreach(println)
  }
}

sort,sortBykey

同样是基于pairRDD的,根据key值来进行排序。ascending升序,默认为true,即升序;numTasks

def main(args: Array[String]): Unit = {
    
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("DemoSort")

    val sc: SparkContext = new SparkContext(conf)

    // 读取学生数据构建RDD
    val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //ascending=false 降序排列
    stuRDD.sortBy(line=>line.split(",")(0),ascending=false)
      .foreach(println)
    
    stuRDD.map(line=>(line.split(",")(0),line))
      .sortByKey(ascending=false)
      .foreach(println)
  }
}
MapValues
object DemoMapValues {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("DemoMapValues")
    val sc = new SparkContext(conf)
    val listRDD: RDD[(String, Int)] = sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3)))
    listRDD.mapValues(v=>v*v)
      .foreach(println)
  }
}

常用操作算子
object DemoAction {
  def main(args: Array[String]): Unit = {
    
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("DemoAction")

    val sc: SparkContext = new SparkContext(conf)

    // 读取学生数据构建RDD
    val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    // foreach
    stuRDD.foreach(println)

    // take 取出前n条数据 相当于limit
    stuRDD.take(100).foreach(println)

    // count
    // 返回RDD的数据量的多少
    val l: Long = stuRDD.count()
    println(l)

    // collect
    // 将RDD转换为Scala中的Array
    // 注意数据量的大小 容易OOM
    val stuArr: Array[String] = stuRDD.collect()
    stuArr.take(10).foreach(println)

    // reduce 全局聚合
    // select sum(age) from student group by 1
    val i: Int = stuRDD.map(line => line.split(",")(2).toInt)
      .reduce(_ + _)
    println(i)

    // save
    stuRDD
      .sample(withReplacement = false, fraction = 0.2)
      .saveAsTextFile("spark/data/sample")
  }

}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号