SparkRDD算子分为两类:Transformation与Action.
Transformation:即延迟加载数据,Transformation会记录元数据信息,当计算任务触发Action,才会真正开始计算。
Action:即立即加载数据,开始计算。
创建RDD的方式有两种:
1、通过sc.textFile(“/root/words.txt”)从文件系统中创建 RDD。
2、#通过并行化scala集合创建RDD:val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
parallelize
定义:def parallelize[T](seq: Seq[T],numSlices: Int)(implicit evidence$1: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]
scala>
val rdd1=sc.parallelize(List(1,2,3,4,5,6,7),2)//分成两个区
scala> rdd1.map(_*2).collect
res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14)
makeRDD
定义:def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit evidence$3: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]
def makeRDD[T](seq: Seq[T],numSlices: Int)(implicit evidence$2: scala.reflect.ClassTag[T]): org.apache.spark.rdd.RDD[T]
scala> val rdd3=sc.makeRDD(1 to 10)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at :24
scala> rdd3.collect
res40: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
map//处理一个/行数据
定义:def map[U](f: Int => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6,7),2)//分成两个区
scala> rdd1.map(_*2).collect
res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14)
scala> rdd.map(x=>(x,1)).collect//形成键值对,形成函数要加()
res17: Array[(Int, Int)] = Array((1,1), (2,1), (9,1), (7,1), (4,1))
mapPartitions//处理一批数据
定义:def mapPartitions[U](f: Iterator[String] => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$6: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
scala> var rdd1 = sc.makeRDD(1 to 5,4)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at :25
scala> rdd3.collect//本是1到5的和,分成四个区
res2: Array[Int] = Array(1, 2, 3, 9)
mapPartitionsWithIndex
定义:def mapPartitionsWithIndex[U](f: (Int, Iterator[Int]) => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$9: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
var
rdd1
=
sc
.
makeRDD
(
1
to
5
,
2
)
//rdd1有两个分区
var
rdd2
=
rdd
.
mapPartitionsWithIndex
{
(
x
,
iter
)
=>
{
var
result
=
List
[
String
]()
var
i
=
0
while
(
iter
.
hasNext
){
i
+=
iter
.
next
()
}
result
.::(
x
+
"|"
+
i
).
iterator
}
}
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala
>
rdd2
.
collect
res13
:
Array
[
String
]
=
Array
(
0
|
3
,
1
|
12
)
partitions
定义:final def partitions: Array[org.apache.spark.Partition]
scala> rdd1.partitions.size
res3: Int = 2
scala> rdd1.partitions.length
res3: Int = 2
repartition(3)//修改分区
定义:def repartition(numPartitions: Int)(implicit ord: Ordering[Int]): org.apache.spark.rdd.RDD[Int]
scala> rdd.repartition(3)
res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at :26
scala> res17.glom.collect
res19: Array[Array[Int]] = Array(Array(4), Array(1, 5), Array(2, 3))
sortBy
定义:def sortBy[K](f: Int => K,ascending: Boolean,numPartitions: Int)(implicit ord: Ordering[K],implicit ctag: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[Int]
scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6,8),2)//分成两个区
scala> rdd2.sortBy(x=>x,true).collect//正序
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6,
scala> rdd2.sortBy(x=>x,false).collect//倒序
res11: Array[Int] = Array(8, 6, 5, 4, 3, 2, 1)
scala> rdd2.sortBy(x=>x,false,3).glom.collect//加上分区
res18: Array[Array[Int]] = Array(Array(8, 6), Array(5, 4), Array(3, 2, 1))
glom
定义:
def glom(): org.apache.spark.rdd.RDD[Array[Int]]
glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD
scala> rdd2.sortBy(x=>x,false,3).glom.collect
res18: Array[Array[Int]] = Array(Array(8, 6), Array(5, 4), Array(3, 2, 1))
collect
定义:def collect[U](f: PartialFunction[Int,U])(implicit evidence$30: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
def collect[B, That](pf: PartialFunction[(Int, Int),B])(implicit bf: scala.collection.generic.CanBuildFrom[Array[(Int, Int)],B,That]): That
def collect[B, That](pf: PartialFunction[(Int, Int),B])(implicit bf: scala.collection.generic.CanBuildFrom[scala.collection.mutable.WrappedArray[(Int, Int)],B,That]): That
def collect(): Array[Int]
scala> rdd.collect
res4: Array[Int] = Array(1, 2, 3, 4, 7, 9)
scala> rdd1.collect({case (5,_) => "A";case (2,_)=>(2,2)})
res30: Array[java.io.Serializable] = Array((2,2), A)
textFile
定义:def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]
scala> val rdd1=sc.textFile("
hdfs://192.168.153.133:9000/spark/sparktmp.txt")//从hdfs读取文件
rdd1: org.apache.spark.rdd.RDD[String] =
hdfs://192.168.153.133:9000/spark/sparktmp.txt MapPartitionsRDD[12] at textFile at :24
scala> rdd1.collect
res28: Array[String] = Array(hello java, hi scala, how are you scala, spark)
scala> val rdd1=sc.textFile("
file:///opt/sparktmp/helloworld.txt")//从本地读取文件
rdd1: org.apache.spark.rdd.RDD[String] =
file:///opt/sparktmp/helloworld.txt MapPartitionsRDD[14] at textFile at :24
scala> rdd1.collect
res29: Array[String] = Array(hello java, hi scala, how are you scala, spark)
scala> val rdd2=sc.textFile("
file:///opt/sparktmp/helloworld.txt",4)//可以加上分区
rdd2: org.apache.spark.rdd.RDD[String] =
file:///opt/sparktmp/helloworld.txt MapPartitionsRDD[6] at textFile at :24
scala> rdd2.getNumPartitions
res4: Int = 4
filter
定义:def filter(f: Int => Boolean): org.apache.spark.rdd.RDD[Int]
scala> rdd3.filter(x=>x>2).collect
res11: Array[Int] = Array(3, 4, 5)
fold
定义: def fold(zeroValue: Int)(op: (Int, Int) => Int): Int
scala> rdd3.fold(5)((x,y)=>{
| val s = "seq_exp = %d + %d"
| println(s.format(x, y))
| x+y})
seq_exp = 5 + 1
seq_exp = 5 + 2
seq_exp = 5 + 3
seq_exp = 5 + 5
seq_exp = 5 + 4
seq_exp = 5 + 6
seq_exp = 11 + 5
seq_exp = 16 + 5
seq_exp = 21 + 7
seq_exp = 28 + 10
seq_exp = 38 + 8
seq_exp = 46 + 9
seq_exp = 55 + 5
res15: Int = 60
scala> rdd.fold(0)((x,y)=>x+y)
res0: Int = 15
foldByKey//key-value
定义:def foldByKey(zeroValue: Double)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)]
def foldByKey(zeroValue: Double,numPartitions: Int)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)]
def foldByKey(zeroValue: Double,partitioner: org.apache.spark.Partitioner)(func: (Double, Double) => Double): org.apache.spark.rdd.RDD[(String, Double)]
scala> a
res25: Array[(String, Double)] = Array((Alice,90.0), (Bob,100.0), (Tom,93.0), (Alice,95.0), (Bob,70.0), (Jack,98.0))
scala> val rdd=sc.parallelize(a,2)
rdd: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[5] at parallelize at :26
scala> rdd.foldByKey(1000)(_+_).collect
res27: Array[(String, Double)] = Array((Tom,1093.0), (Alice,2185.0), (Bob,2170.0), (Jack,1098.0))
reduce
定义:def reduce(f: (Int, Int) => Int): Int
scala> val rdd=sc.parallelize(1 to 3,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3)
scala> rdd.reduce((x,y)=>x+y)
res2: Int = 6
reduceByKey
定义:def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val value: RDD[(Int, Int)] = rdd2.map((1, _)).reduceByKey((x,y)=>x+y)//相同key的value相加
value.collect().foreach(println)
(1,120)
groupBy//将数据更换为k-v
定义:def groupBy[K](f: Int => K,p: org.apache.spark.Partitioner)(implicit kt: scala.reflect.ClassTag[K],implicit ord: Ordering[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])]
def groupBy[K](f: Int => K,numPartitions: Int)(implicit kt: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])]
def groupBy[K](f: Int => K)(implicit kt: scala.reflect.ClassTag[K]): org.apache.spark.rdd.RDD[(K, Iterable[Int])]
scala> val rdd=sc.parallelize(1 to 3,2)
scala> rdd.groupBy(x=>x+1)
res2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at :26
scala> rdd.groupBy(x=>x+1).collect
res3: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(3)), (2,CompactBuffer(1)), (3,CompactBuffer(2)))
groupByKey
定义:
def groupByKey(): org.apache.spark.rdd.RDD[(Int, Iterable[Int])]
def groupByKey(numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Iterable[Int])]
def groupByKey(partitioner: org.apache.spark.Partitioner): org.apache.spark.rdd.RDD[(Int, Iterable[Int])]
val rdd2=sc.parallelize(Array(18, 19, 20, 21, 22, 20))
rdd2.map((
1
,
_)).groupByKey().collect().foreach(
println
)
(1,CompactBuffer(18, 19, 20, 21, 22, 20))
combineByKey
定义:def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(Int, C)]
def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, C)]
def combineByKey[C](createCombiner: Int => C,mergevalue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(Int, C)]
val
rdd: RDD[
String
] = sc.textFile(
"in/age.csv"
)
val
rdd2= rdd.map(x => {
val a=x.split(" ")
a(1).toInt
})
val value: RDD[(Int, (Int, Int))] = rdd2.map((1, _)).combineByKey(x => (x, 1),
(x: (Int, Int), y: Int) => {//区间内,进来的值表是value,y表示age,相当于刚进来的x
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {//区间外相加
(x._1 + y._1, x._2 + y._2)
})
value.collect().foreach(println)