栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他 > Spark

Spark学习之RDD简单算子

Spark 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力



collect

返回RDD的所有元素

  1. scala> var input=sc.parallelize(Array(-1,0,1,2,2)) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at :27 
  2. scala> var result=input.collect result: Array[Int] = Array(-1, 0, 1, 2, 2) 

count,coutByValue

count返回RDD的元素数量,countByValue返回每个值的出现次数

  1. scala> var input=sc.parallelize(Array(-1,0,1,2,2)) scala> var result=input.count 
  2. result: Long = 5 scala> var result=input.countByValue 
  3. result: scala.collection.Map[Int,Long] = Map(0 -> 1, 1 -> 1, 2 -> 2, -1 -> 1) 

take,top,takeOrdered

take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素

  1. var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6))  
  2. scala> var result=input.take(6) result: Array[Int] = Array(1, 2, 3, 4, 9, 8) 
  3. scala> var result=input.take(20) result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6) 
  4.  scala> var result=input.takeOrdered(6) 
  5. result: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> var result=input.takeOrdered(6)(Ordering[Int].reverse) 
  6. result: Array[Int] = Array(9, 8, 7, 6, 5, 4)  
  7. scala> var result=input.top(6) result: Array[Int] = Array(9, 8, 7, 6, 5, 4 

Filter

传入返回值为boolean的函数,返回改函数结果为true的RDD

  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.filter(_>0).collect() 
  2. result: Array[Int] = Array(1, 2) 

map,flatmap

map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出

  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.map(_+1).collect 
  2. result: Array[Int] = Array(0, 1, 2, 3)  
  3. scala>var result=input.map(x=>x.to(3)).collect result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3)) 
  4.  scala>var result=input.flatMap(x=>x.to(3)).collect 
  5. result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3) 

distinct

RDD去重

  1. scala>var input=sc.parallelize(Array(-1,0,1,2,2)) scala>var result=input.distinct.collect 
  2. result: Array[Int] = Array(0, 1, 2, -1) 

Reduce

通过函数聚集RDD中的所有元素

  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.reduce((x,y)=>{println(x,y);x+y}) 
  2. (-1,1)  //处理-1,1,结果为0,RDD剩余元素为{0,2} (0,2)   //上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0} 
  3. (2,0)   //上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{} result: Int = 2 

sample,takeSample

sample就是从RDD中抽样,***个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,***个参数是withReplacement,第二个参数是样本个数

  1. var rdd=sc.parallelize(1 to 20)  
  2. scala> rdd.sample(true,0.5).collect res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20) 
  3.  scala> rdd.sample(false,0.5).collect 
  4. res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18)  
  5. scala> rdd.sample(true,1).collect res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20) 
  6.  scala> rdd.sample(false,1).collect 
  7. res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)  
  8. scala> rdd.takeSample(true,3) res1: Array[Int] = Array(1, 15, 19) 
  9.  scala> rdd.takeSample(false,3) 
  10. res2: Array[Int] = Array(7, 16, 6) 

collectAsMap,countByKey,lookup

collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value

  1. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))  
  2. scala> var result=input.collectAsMap result: scala.collection.Map[Int,String] = Map(2 -> two, 4 -> four, 1 -> one, 3 -> three) 
  3.  scala> var result=input.countByKey 
  4. result: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1, 4 -> 1)  
  5. scala> var result=input.lookup(1) result: Seq[String] = WrappedArray(1, one) 
  6.  scala> var result=input.lookup(2) 
  7. result: Seq[String] = WrappedArray(two) 

groupBy,keyBy

groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key

  1. scala> var rdd=sc.parallelize(List("A1","A2","B1","B2","C")) scala> var result=rdd.groupBy(_.substring(0,1)).collect 
  2. result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C)))  
  3. scala> var rdd=sc.parallelize(List("hello","world","spark","is","fun")) scala> var result=rdd.keyBy(_.length).collect 
  4. result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun)) 

keys,values

  1. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala> var result=input.keys.collect 
  2. result: Array[Int] = Array(1, 1, 2, 3, 4) scala> var result=input.values.collect 
  3. result: Array[String] = Array(1, one, two, three, four)  
  4. mapvalues mapvalues对K-V形式的RDD的每个Value进行操作 
  5. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala> var result=input.mapValues(_*2).collect 
  6. result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour)) 

union,intersection,subtract,cartesian

union合并2个集合,不去重 subtract将***个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积

  1. scala> var rdd1=sc.parallelize(Array(-1,1,1,2,3)) scala> var rdd2=sc.parallelize(Array(0,1,2,3,4)) 
  2.  scala> var result=rdd1.union(rdd2).collect 
  3. result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4)  
  4. scala> var result=rdd1.intersection(rdd2).collect result: Array[Int] = Array(1, 2, 3) 
  5.  scala> var result=rdd1.subtract(rdd2).collect 
  6. result: Array[Int] = Array(-1)  
  7. scala> var result=rdd1.cartesian(rdd2).collect result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4)) 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/796652.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号