栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkRDD算子

SparkRDD算子

SparkRDD算子分为两类:Transformation与Action. Transformation:即延迟加载数据,Transformation会记录元数据信息,当计算任务触发Action,才会真正开始计算。 Action:即立即加载数据,开始计算。 创建RDD的方式有两种: 1、通过sc.textFile(“/root/words.txt”)从文件系统中创建 RDD。 2、#通过并行化scala集合创建RDD:val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) parallelize 定义:def parallelize[T](seq: Seq[T],numSlices: Int)(implicit evidence$1: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T] scala>  val rdd1=sc.parallelize(List(1,2,3,4,5,6,7),2)//分成两个区 scala> rdd1.map(_*2).collect res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14) makeRDD 定义:def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit evidence$3: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]    def makeRDD[T](seq: Seq[T],numSlices: Int)(implicit evidence$2: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T] scala> val rdd3=sc.makeRDD(1 to 10) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at :24 scala> rdd3.collect res40: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) map//处理一个/行数据 定义:def map[U](f: Int => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6,7),2)//分成两个区 scala> rdd1.map(_*2).collect res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14) scala> rdd.map(x=>(x,1)).collect//形成键值对,形成函数要加() res17: Array[(Int, Int)] = Array((1,1), (2,1), (9,1), (7,1), (4,1)) mapPartitions//处理一批数据 定义:def mapPartitions[U](f: Iterator[String] => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$6: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] scala> var rdd1 = sc.makeRDD(1 to 5,4) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24 scala>     var rdd3 = rdd1.mapPartitions{ x => {      |       var result = List[Int]()      |           var i = 0      |           while(x.hasNext){      |             i += x.next()      |           }      |           result.::(i).iterator      |       }} rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at :25 scala> rdd3.collect//本是1到5的和,分成四个区 res2: Array[Int] = Array(1, 2, 3, 9) mapPartitionsWithIndex 定义:def mapPartitionsWithIndex[U](f: (Int, Iterator[Int]) => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$9: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] var rdd1 = sc . makeRDD ( 1 to 5 , 2 ) //rdd1有两个分区 var rdd2 = rdd . mapPartitionsWithIndex { ( x , iter ) => { var result = List [ String ]() var i = 0 while ( iter . hasNext ){ i += iter . next () } result .::( x + "|" + i ). iterator } } //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引 scala > rdd2 . collect res13 : Array [ String ] = Array ( 0 | 3 , 1 | 12 ) partitions 定义:final def partitions: Array[org.apache.spark.Partition] scala> rdd1.partitions.size res3: Int = 2 scala> rdd1.partitions.length res3: Int = 2 repartition(3)//修改分区 定义:def repartition(numPartitions: Int)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int] scala> rdd.repartition(3) res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at :26 scala> res17.glom.collect res19: Array[Array[Int]] = Array(Array(4), Array(1, 5), Array(2, 3)) sortBy    定义:def sortBy[K](f: Int => K,ascending: Boolean,numPartitions: Int)(implicit ord: Ordering[K],implicit ctag: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[Int] scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6,8),2)//分成两个区 scala> rdd2.sortBy(x=>x,true).collect//正序 res10: Array[Int] = Array(1, 2, 3, 4, 5, 6,  scala> rdd2.sortBy(x=>x,false).collect//倒序 res11: Array[Int] = Array(8, 6, 5, 4, 3, 2, 1) scala> rdd2.sortBy(x=>x,false,3).glom.collect//加上分区 res18: Array[Array[Int]] = Array(Array(8, 6), Array(5, 4), Array(3, 2, 1)) glom 定义: def glom(): org.apache.spark.rdd.RDD[Array[Int]] glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD scala> rdd2.sortBy(x=>x,false,3).glom.collect res18: Array[Array[Int]] = Array(Array(8, 6), Array(5, 4), Array(3, 2, 1)) collect 定义:def collect[U](f: PartialFunction[Int,U])(implicit evidence$30: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]    def collect[B, That](pf: PartialFunction[(Int, Int),B])(implicit bf: scala.collection.generic.CanBuildFrom[Array[(Int, Int)],B,That]): That def collect[B, That](pf: PartialFunction[(Int, Int),B])(implicit bf: scala.collection.generic.CanBuildFrom[scala.collection.mutable.WrappedArray[(Int, Int)],B,That]): That def collect(): Array[Int] scala> rdd.collect res4: Array[Int] = Array(1, 2, 3, 4, 7, 9) scala> rdd1.collect({case (5,_) => "A";case (2,_)=>(2,2)}) res30: Array[java.io.Serializable] = Array((2,2), A) textFile 定义:def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String] scala> val rdd1=sc.textFile(" hdfs://192.168.153.133:9000/spark/sparktmp.txt")//从hdfs读取文件 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.153.133:9000/spark/sparktmp.txt MapPartitionsRDD[12] at textFile at :24 scala> rdd1.collect res28: Array[String] = Array(hello java, hi scala, how are you scala, spark) scala> val rdd1=sc.textFile(" file:///opt/sparktmp/helloworld.txt")//从本地读取文件 rdd1: org.apache.spark.rdd.RDD[String] = file:///opt/sparktmp/helloworld.txt MapPartitionsRDD[14] at textFile at :24 scala> rdd1.collect res29: Array[String] = Array(hello java, hi scala, how are you scala, spark) scala> val rdd2=sc.textFile(" file:///opt/sparktmp/helloworld.txt",4)//可以加上分区 rdd2: org.apache.spark.rdd.RDD[String] = file:///opt/sparktmp/helloworld.txt MapPartitionsRDD[6] at textFile at :24 scala> rdd2.getNumPartitions res4: Int = 4 filter 定义:def filter(f: Int => Boolean): org.apache.spark.rdd.RDD[Int] scala> rdd3.filter(x=>x>2).collect res11: Array[Int] = Array(3, 4, 5) fold 定义: def fold(zeroValue: Int)(op: (Int, Int) => Int): Int  scala> rdd3.fold(5)((x,y)=>{      |     val s = "seq_exp = %d + %d"      |     println(s.format(x, y))      |     x+y}) seq_exp = 5 + 1 seq_exp = 5 + 2 seq_exp = 5 + 3 seq_exp = 5 + 5 seq_exp = 5 + 4 seq_exp = 5 + 6 seq_exp = 11 + 5 seq_exp = 16 + 5 seq_exp = 21 + 7 seq_exp = 28 + 10 seq_exp = 38 + 8 seq_exp = 46 + 9 seq_exp = 55 + 5 res15: Int = 60 scala> rdd.fold(0)((x,y)=>x+y) res0: Int = 15 foldByKey//key-value 定义:def foldByKey(zeroValue: Double)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)] def foldByKey(zeroValue: Double,numPartitions: Int)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)] def foldByKey(zeroValue: Double,partitioner: org.apache.spark.Partitioner)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)] scala> a res25: Array[(String, Double)] = Array((Alice,90.0), (Bob,100.0), (Tom,93.0), (Alice,95.0), (Bob,70.0), (Jack,98.0)) scala> val rdd=sc.parallelize(a,2) rdd: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[5] at parallelize at :26 scala> rdd.foldByKey(1000)(_+_).collect res27: Array[(String, Double)] = Array((Tom,1093.0), (Alice,2185.0), (Bob,2170.0), (Jack,1098.0)) reduce 定义:def reduce(f: (Int, Int) => Int): Int scala> val rdd=sc.parallelize(1 to 3,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 scala> rdd.collect res0: Array[Int] = Array(1, 2, 3) scala> rdd.reduce((x,y)=>x+y) res2: Int = 6 reduceByKey 定义:def reduceByKey(func: (V, V) => V): RDD[(K, V)] val value: RDD[(Int, Int)] = rdd2.map((1, _)).reduceByKey((x,y)=>x+y)//相同key的value相加 value.collect().foreach(println) (1,120) groupBy//将数据更换为k-v 定义:def groupBy[K](f: Int => K,p: org.apache.spark.Partitioner)(implicit kt: scala.reflect.ClassTag[K],implicit ord: Ordering[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])] def groupBy[K](f: Int => K,numPartitions: Int)(implicit kt: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])] def groupBy[K](f: Int => K)(implicit kt: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])] scala> val rdd=sc.parallelize(1 to 3,2) scala> rdd.groupBy(x=>x+1) res2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at :26 scala> rdd.groupBy(x=>x+1).collect res3: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(3)), (2,CompactBuffer(1)), (3,CompactBuffer(2))) groupByKey 定义: def groupByKey(): org.apache.spark.rdd.RDD[(Int, Iterable[Int])]                                             def groupByKey(numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Iterable[Int])]                           def groupByKey(partitioner: org.apache.spark.Partitioner): org.apache.spark.rdd.RDD[(Int, Iterable[Int])] val rdd2=sc.parallelize(Array(18, 19, 20, 21, 22, 20)) rdd2.map(( 1 , _)).groupByKey().collect().foreach( println ) (1,CompactBuffer(18, 19, 20, 21, 22, 20)) combineByKey 定义:def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(Int, C)] def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, C)] def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(Int, C)] val rdd: RDD[ String ] = sc.textFile( "in/age.csv" ) val rdd2= rdd.map(x => { val a=x.split(" ") a(1).toInt }) val value: RDD[(Int, (Int, Int))] = rdd2.map((1, _)).combineByKey(x => (x, 1), (x: (Int, Int), y: Int) => {//区间内,进来的值表是value,y表示age,相当于刚进来的x (x._1 + y, x._2 + 1) }, (x: (Int, Int), y: (Int, Int)) => {//区间外相加 (x._1 + y._1, x._2 + y._2) }) value.collect().foreach(println)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688590.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号