栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Shell 操作RDD

Spark Shell 操作RDD

Spark Shell 操作RDD
    • 一、Spark Shell 简述
    • 二、RDD创建方式
    • 三、RDD之常见算子

一、Spark Shell 简述
  • 【Spark-shell】
    是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用- scala编写spark程序。
  • 【两种启动Spark Shell】
    • 本地模式启动:bin/spark-shell

    • 集群模式启动:
      如:spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
      参数说明:
      –master spark://spark81:7077 指定Master的地址
      –executor-memory 2g 指定每个worker可用内存为2G
      –total-executor-cores 2 指定整个集群使用的cup核数为2个

    • 注意:
      如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。
      请注意local模式和集群模式的日志区别:

二、RDD创建方式
  1. 采取默认分区进行创建,有如下两种

    1. 读取外部文件系统(有多种)生成RDD
      val textRdd = sc.textFile("/tools/word.txt")

    2. 通过并行化产生RDD
      val parallelizeRDD = sc.parallelize(Array(1,2,3,4,5,5,5))

    3. 特点: 没有指定分区数量,采用默认的分区数(等于当前集群节点的CPU core)
      查看分区数量:
      val partitions = textRdd.partitions
      结果:
      Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@3c1, org.apache.spark.rdd.HadoopPartition@3c2)

  2. 指定分区数量产生RDD,有如下两种:

    1. 读取外部文件系统(有多种)生成RDD
      val textRdd = sc.textFile("/tools/word.txt",3)
      查看分区数量:
      val partitions = textRdd.partitions
      结果:
      Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@41e, org.apache.spark.rdd.HadoopPartition@41f, org.apache.spark.rdd.HadoopPartition@420)

    2. 通过并行化产生RDD
      val parallelizeRDD = sc.parallelize(Array(1,2,3,4,5,5,5),4)
      查看分区数量:
      val partitions = textRdd.partitions
      结果:
      Array(org.apache.spark.rdd.ParallelCollectionPartition@75e, org.apache.spark.rdd.ParallelCollectionPartition@75f, org.apache.spark.rdd.ParallelCollectionPartition@760, org.apache.spark.rdd.ParallelCollectionPartition@761)

三、RDD之常见算子

一、Transformation算子
特点:延迟加载,记录应用于基础数据集(RDD)的动作, 当触发action算子(会生成Job)时真正执行
API解释与使用说明:
官网文档:Spark API

  1. 例子:

    1. 创建RDD:val testRdd = sc.parallelize(Array(1,3,2,5,6,7,10))

    2. map(func)

      	val mapRdd = testRdd.map(_+1)
      	val result = mapRdd.collect
      

      结果:
      Array[Int] = Array(2, 4, 3, 6, 7, 8, 11)

    3. filter(func)
      val fliterRdd = mapRdd.filter(_ > 5)
      collect结果:

      	Array[Int] = Array(6, 7, 8, 11)
      
    4. flatMap(func)
      val flatRdd = mapRdd.flatMap(_.toString)
      collect结果:Array[Char] = Array(2, 4, 3, 6, 7, 8, 1, 1)

    5. mapPartitions(func)

      def func1(iter:Iterator[Int]):Iterator[String] ={
      	iter.toList.map( x => "[value=" + x + "]" ).iterator
      }
      val mapPRDD = mapRdd.mapPartitions(func1)
      mapPRDD.collect
      

      collect结果:Array[String] = Array([value=2], [value=4], [value=3], [value=6], [value=7], [value=8], [value=11])
      下面是另外一个例子:

      val textFileRDD = sc.textFile("/tools/word.txt")
      val flatMapRDD = textFileRDD.flatMap(x => x.split(" "))
      def func2(iter:Iterator[String]):Iterator[(String,Int)] ={
      	iter.toList.map( x => (x,1) ).iterator
      }
      val mapPartRDD = flatMapRDD.mapPartitions(func2)
      mapPartRDD.collect
      

      结果:

      Array[(String, Int)] = Array((I,1), (love,1), (Guizhou,1), (I,1), (love,1), (Guiyang,1), (Guiyang,1), (is,1), (the,1), (capital,1), (of,1), (Guizhou,1))
      
    6. mapPartitionsWithIndex(index,func)

      def func2(index:Int,iter:Iterator[Int]):Iterator[String] ={
      	iter.toList.map( x => "[PartId:" + index + ",value=" + (x * 4)  + "]" ).iterator
      }
      val mapPRDD2 = mapRdd.mapPartitionsWithIndex(func2)
      mapPRDD2 .collect
      

      collect结果:

      Array[String] = Array([PartId:0,value=8], [PartId:0,value=16], [PartId:0,value=12], [PartId:1,value=24], [PartId:1,value=28], [PartId:1,value=32], [PartId:1,value=44])
      

      注意:PartId是分区号
      再举一个例子:

      def func22(index:Int,iter:Iterator[String]):Iterator[(Int,(String,Int))] ={
      	iter.toList.map( x => (index,(x,1)) ).iterator
      }
      val mapPartRDD2 = flatMapRDD.mapPartitionsWithIndex(func22)
      mapPartRDD2.collect
      

      collect结果:

      Array[(Int, (String, Int))] = Array((0,(I,1)), (0,(love,1)), (0,(Guizhou,1)), (0,(I,1)), (0,(love,1)), (0,(Guiyang,1)), (0,(Guiyang,1)), (0,(is,1)), (0,(the,1)), (0,(capital,1)), (0,(of,1)), (0,(Guizhou,1)))
      
    7. repartition(numpartitions)

      val newRDD = testRdd.repartition(3)
      val partRDD = newRDD.mapPartitionsWithIndex(func2)
      partRDD.collect
      

      结果:

      Array[String] = Array([PartId:0,value=24], [PartId:0,value=8], [PartId:1,value=4], [PartId:1,value=28], [PartId:2,value=20], [PartId:2,value=40], [PartId:2,value=12])
      
  2. 上述例子提供的重要信息:
    1、spark提供了很多的transformation算子
    2、业务逻辑通常是需要自己编写,以自定义函数的形式编程业务逻辑
    3、将自定义的函数作为spark的transformation算子参数,借助spark完成业务逻辑计算

二、Action算子

创建RDD:val testRdd = sc.parallelize(Array(1,3,2,5,6,7,10))
进行前提转换:

val mapRdd = testRdd.map(_+1)
def func2(index:Int,iter:Iterator[Int]):Iterator[String] ={
	iter.toList.map( x => "[PartId:" + index + ",value=" + (x * 4)  + "]" ).iterator
}
val mapPRDD2 = mapRdd.mapPartitionsWithIndex(func2)
  1. foreach

    mapPRDD2.foreach(println)
    

    结果被打印在worker节点上:

  2. collect

    mapPRDD2.collect
    

    结果显示在Driver Program所在的机器上,如下所示:

  3. 执行逻辑或流程如下图所示:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389622.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号