RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存 在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
package persist
import org.apache.spark.{SparkConf, SparkContext}
object Persist1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(sparkConf)
val list=List("Hello Scala","Hello Spark")
val rdd=sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(
data=>{
println("@@@@@@@@@@@@@@@")
(data,1)
}
)
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("*********************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
rdd不保存数据,会重复执行,降低效率
修改
package persist
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Persist1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(sparkConf)
val list=List("Hello Scala","Hello Spark")
val rdd=sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(
data=>{
println("@@@@@@@@@@@@@@@")
(data,1)
}
)
//cache 默认持久化的操作,只能将数据保存到内存中,如果想要保存到磁盘文件,需要更改存储的级别
//mapRDD.cache()
mapRDD.persist(StorageLevel.MEMORY_AND_DISK)
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("*********************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
不重复执行
rdd对象的持久化操作不一定是为了重复使用,在数据执行较长,或数据比较重要的场合也可以采用持久化操作
RDD CheckPoint检查点所谓的检查点其实就是通过将 RDD 中间结果写入磁盘 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点 之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
package persist
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
object Persist2 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setCheckpointDir("cp")
val list=List("Hello Scala","Hello Spark")
val rdd=sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(
data=>{
println("@@@@@@@@@@@@@@@")
(data,1)
}
)
mapRDD.checkpoint()
//checkpoint 需要添加落盘,需要指定检查点保存的路径
//检查点路劲保存的文件,当作业执行完毕后,不会被删除
//一般保存路径都是在分布式存储系统:HDFS
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("*********************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
缓存和检查点区别
1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。 会在血缘关系中添加新的依赖,一旦出现问题可以重新读取数据
2)persist:将数据临时存储在磁盘文件中进行数据重用。涉及到磁盘IO,性能较低,但是数据安全。如果作业执行完毕,临时保存的数据文件就会丢失
3)checkpoint:将数据长久的保存在磁盘文件中进行数据重用。涉及到磁盘IO,性能较低,但是数据安全。为了保证数据安全,所以一般情况下,会独立执行作业。为了能提高效率,一般情况下,联合cache联合使用。执行过程当中,会切断血缘关系,重新建立新的血缘关系,checkpoint等同于改变数据源
RDD分区器Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认 分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分 区,进而决定了 Reduce 的个数。 ➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None ➢ 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
package part
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Part1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd=sc.makeRDD(List(
("nba","xxxxxxx"),
("cba","xxxxxxx"),
("wnba","xxxxxxx"),
("nba","xxxxxxx"),
),3)
val parRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
parRDD.saveAsTextFile("output")
sc.stop()
}
}
class MyPartitioner extends Partitioner{
//分区数量
override def numPartitions: Int = 3
//根据数据的key值 返回数据的分区索引(从0开始)
override def getPartition(key: Any): Int = {
if(key=="nba"){
0
}else if(key=="wnba"){
1
}else if(key=="cba"){
2
}else{
2
}
}
}
文件读取与保存
sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass]。
object 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T: ClassTag]函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用 saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc: SparkContext =new SparkContext(sparkConf)
val rdd: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("b",2),("c",3)))
rdd.saveAsTextFile("output1")
rdd.saveAsObjectFile("output2")
rdd.saveAsSequenceFile("output3")
val rdd1:RDD[String]=sc.textFile("output1")
val rdd2:RDD[(String,Int)]=sc.objectFile("output2")
val rdd3:RDD[(String,Int)]=sc.sequenceFile("output3")
println(rdd1.collect().mkString(","))
println(rdd2.collect().mkString(","))
println(rdd3.collect().mkString(","))
sc.stop()
}
累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
var sum: Int = 0;
rdd.foreach(num => {
sum = num + sum
})
println(sum)
sc.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//获取累加器
//Spark默认提供简单数据聚合的累加器
val sumAcc: LongAccumulator = sc.longAccumulator("sum")
rdd.foreach(
num=>{
//使用累加器
sumAcc.add(num)
}
)
println(sumAcc.value)
sc.stop()
}
多加和少加
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//获取累加器
//Spark默认提供简单数据聚合的累加器
val sumAcc: LongAccumulator = sc.longAccumulator("sum")
// rdd.foreach(
// num=>{
// //使用累加器
// sumAcc.add(num)
// }
// )
// println(sumAcc.value)
val mapRDD=rdd.map(
num=>{
sumAcc.add(num)
}
)
//少加:转换算子中调用累加器,如果没有行动算子的话,不会执行
println(sumAcc.value)
//多加:转换算子中调用累加器,如果多次调用行动算子,会重复计算
mapRDD.collect()
mapRDD.collect()
println(sumAcc.value)
sc.stop()
}
自定义累加器
object WordCount4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("Acc")
val sc=new SparkContext(conf)
val rdd=sc.makeRDD(List("hello","spark","hello"))
//创建累加器对象
val wcAcc=new MyAccumulator
//向Spark进行注册
sc.register(wcAcc,"wordCountAcc")
rdd.foreach(
word=>{
//数据累加
wcAcc.add(word)
}
)
//获取累加的结果
println(wcAcc.value)
sc.stop()
}
}
class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
private var wcMap: mutable.Map[String, Long] =mutable.Map[String,Long]()
//判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
//重置累加器
override def reset(): Unit = wcMap.clear()
//获取累加器需要计算的值
override def add(v: String): Unit = {
val newCnt=wcMap.getOrElse(v,0L)+1
wcMap.update(v,newCnt)
}
//Driver合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1: mutable.Map[String, Long] =this.wcMap
val map2=other.value
map2.foreach{
case (word,count)=>{
val newCount: Long =map1.getOrElse(word,0L)+count
map1.update(word,newCount)
}
}
}
override def value: mutable.Map[String, Long] = wcMap
}
广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个 或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务 分别发送。
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd1: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2: RDD[(String, Int)] =sc.makeRDD(List(("a",4),("b",5),("c",6)))
//join会导致数据量的集合增长,并且会影响shuffle的性能,不推荐使用
// val joinRDD: RDD[(String, (Int, Int))] =rdd1.join(rdd2)
// joinRDD.collect().foreach(println)
val map: mutable.Map[String, Int] =mutable.Map(("a",4),("b",5),("c",6))
rdd1.map{
case (w,c)=>{
val l: Int = map.getOrElse(w, 0)
(w,(c,l))
}
}.collect().foreach(println)
//封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd2.map{
case (w,c)=>
val l: Int = bc.value.getOrElse(w, 0)
(w,(c,l))
}.collect().foreach(println)
sc.stop()
}
热门商品
package rep
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HotCategoryTop10Analysis2 {
def main(args: Array[String]): Unit = {
//TODO Top10热门商品
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategory")
val sc: SparkContext = new SparkContext(sparkConf)
//1、读取原始数据
val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")
//2将数据转换结构
//点击的场合:(品类id,(1,0,0))
//下单的场合:(品类id,(0,1,0))
//支付的场合:(品类id,(0,0,1))
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {
val datas: Array[String] = action.split("_")
if (datas(6) != "-1") {
//点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
//下单的场合
val ids: Array[String] = datas(8).split("_")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
//支付的场合
val ids = datas(10).split("_")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
})
//3、将相同的品类id的数据进行分组聚合
// (品类id,(点击数量,下单数量,支付数量))
val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
)
//4、将统计结果根据数量进行降序处理,取前10
val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)
//6、将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}
}
package rep
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
//TODO Top10热门商品
val sparkConf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("HotCategory")
val sc: SparkContext =new SparkContext(sparkConf)
//1、读取原始数据
val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")
//2、统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD: RDD[String] = actionRDD.filter(
action => {
val datas: Array[String] = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas: Array[String] = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
//3、统计品类的下单数量:(品类ID,下单数据)
val orderActionRDD: RDD[String] = actionRDD.filter(
action => {
val datas: Array[String] = action.split("_")
datas(8) != "null"
}
)
val orderCountRDD: RDD[(String, Int)] = orderActionRDD.flatMap(
action => {
val datas: Array[String] = action.split("_")
val cid: String = datas(8)
val cids: Array[String] = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
//4、统计品类的支付数据:(品类id,支付数据)
val payActionRDD=actionRDD.filter(
action=>{
val datas: Array[String] =action.split("_")
datas(10)!="null"
}
)
val payCountRDD=payActionRDD.flatMap(
action=>{
val datas: Array[String] =action.split("_")
val cid: String =datas(10)
val cids: Array[String] =cid.split(",")
cids.map(id=>(id,1))
}
).reduceByKey(_+_)
//5、将品类排序,并取前10名
//点击数量 ,下单数量,支付数量
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
clickCountRDD.cogroup(orderCountRDD, payCountRDD)
val analysisRDD: RDD[(String, (Int, Int, Int))] = cogroupRDD.mapValues({
case (clickIter, orderIter, payIter) => {
var clickCnt: Int = 0
val iter1: Iterator[Int] = clickIter.iterator
if (iter1.hasNext) {
clickCnt = iter1.next()
}
var orderCnt= 0
val iter2: Iterator[Int] = orderIter.iterator
if (iter2.hasNext) orderCnt = iter2.next()
var payCnt: Int = 0
val iter3: Iterator[Int] = payIter.iterator
if (iter3.hasNext) payCnt = iter3.next()
(clickCnt, orderCnt, payCnt)
}
})
val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)
//6、将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}
}
def main(args: Array[String]): Unit = {
//TODO Top10热门商品
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategory")
val sc: SparkContext = new SparkContext(sparkConf)
//1、读取原始数据
val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")
//2将数据转换结构
//点击的场合:(品类id,(1,0,0))
//下单的场合:(品类id,(0,1,0))
//支付的场合:(品类id,(0,0,1))
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {
val datas: Array[String] = action.split("_")
if (datas(6) != "-1") {
//点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
//下单的场合
val ids: Array[String] = datas(8).split("_")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
//支付的场合
val ids = datas(10).split("_")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
})
//3、将相同的品类id的数据进行分组聚合
// (品类id,(点击数量,下单数量,支付数量))
val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
)
//4、将统计结果根据数量进行降序处理,取前10
val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)
//6、将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}



