2、算子(1)parallelize:从集合中创建RDD
(2)makeRDD:从集合中创建RDD(底层还是parallelize)(第二个参数:分区的数量)
(3)textFile的方式读取文件创建RDD
3、算子应用(1)转换算子(Transformations):将一个RDD变成另一个RDD,RDD之间的转换----懒执行,需要行为算子触发执行
(2)操作算子(Actions行为算子):触发任务的调度和作业的执行,不能一个RDD变成另一个 collect,take
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果
(1)map
(2)flatMap
(3)mapPartition(转换算子)//按分区去处理数据
//有时需要从外部获取数据
(4)foreachPartition(转换算子)
//每一个分区由一个task去处理数据,相当于每一个分区会创建一个连接
//适用于一般需要将数据写入到外部系统
//有时在处理数据的时候需要和外部系统建立连接
//如果连接建立在Driver端,连接是不能被序列化的,及在算子内部无法使用算子外部创建的连接
//那么可以使用MapPartition,foreachPartition算子降低连接的创建与销毁次数,提高效率
(5)mapPartitionsWithIndex(转换算子):会在遍历数据的时候给分区加上一个索引index
val stuRDD: RDD[String] = sc.textFile("sparkproject/data/students.txt")
stuRDD.mapPartitionsWithIndex((index,rdd)=>{
println("当前遍历的分区"+index)
rdd.map(line=>line.split(",")(1))
}).foreach(println)
(6)filter(转换算子):过滤数据
(7)sample(转换算子):对数据进行取样
val stuRDD: RDD[String] = sc.textFile("sparkproject/data/students.txt")
val smaRDD = stuRDD.sample(false,0.1)
smaRDD.foreach(println)
(8)union(转换算子):将两个RDD首尾相连成一个RDD,两个RDD结构需要一样
(9)join(转换算子):关联RDD
(10)groupBy
stuRDD.groupBy(line => line.split(",")(4))
.map(kv => (kv._1, kv._2.size))
.foreach(println)
groupByKey:作用在key-value格式上,默认按key分组
stuRDD.map(line => (line.split(",")(4), line))
.groupByKey()
.map(ke => (ke._1, ke._2.size))
.foreach(println)
reduceByKey:需要接收一个函数(聚合函数),按key分组,后面1+1...(一般加和,max,min等)
stuRDD.map(line => (line.split(",")(4), 1))
.reduceByKey((x: Int, y: Int) => x + y)
.foreach(println)
reduecBykey会进行预聚合数据,而groupByKey不会,效率比groupByKey,但是功能弱`在这里插入代码片`
(11)sortBy(转换算子)
//设置参数降序
stuRDD.sortBy(line=>line.split(",")(0),ascending = false)
.foreach(println)
sortByKey直接以key进行排序,默认升序
stuRDD.map(line=>(line.split(",")(0),line))
.sortByKey(ascending = false)
.foreach(println)
(12)mapValue(转换算子):作用在key-value格式上,传入一个函数,将RDD的value传给函数,key不变,数据规模也不变
(13)操作算子:foreach,take,count(返回RDD的长度)
collect:将RDD转化为数组Array(将数据拿到内存,注意内存大小)
reduce:全局聚合
saveAsTextFile:数据保存
(14)countByKey:求wordcount的,最后返回map,是一个行为算子
4、拉链countByKey的源码: 思路很简单,就是先把rdd的value都变成1,
然后reduceByKey,在通过collect行动算子拉取到driver, 最后toMap转换成一个Map类型
相应位置的数据进行组合,拉链操作两个数据类型可以不一致 val list1RDD: RDD[Int] = sc.parallelize(List(1,2,3,4)) val list2RDD: RDD[Int] = sc.parallelize(List(5,6,7,8)) val rdd3: RDD[(Int, Int)] = list1RDD.zip(list2RDD) rdd3.foreach(println)5、思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。



