栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark(五)

spark(五)

RDD持久化 RDD Cache缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存 在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

package persist

import org.apache.spark.{SparkConf, SparkContext}

object Persist1 {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext = new SparkContext(sparkConf)

    val list=List("Hello Scala","Hello Spark")

    val rdd=sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(
      data=>{
        println("@@@@@@@@@@@@@@@")
        (data,1)
      }
    )

    val reduceRDD = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)

    println("*********************")

    val groupRDD = mapRDD.groupByKey()

    groupRDD.collect().foreach(println)

    sc.stop()
  }

}

rdd不保存数据,会重复执行,降低效率

修改

package persist

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Persist1 {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext = new SparkContext(sparkConf)

    val list=List("Hello Scala","Hello Spark")

    val rdd=sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(
      data=>{
        println("@@@@@@@@@@@@@@@")
        (data,1)
      }
    )

    //cache 默认持久化的操作,只能将数据保存到内存中,如果想要保存到磁盘文件,需要更改存储的级别
    //mapRDD.cache()
    mapRDD.persist(StorageLevel.MEMORY_AND_DISK)

    val reduceRDD = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)

    println("*********************")

    val groupRDD = mapRDD.groupByKey()


    groupRDD.collect().foreach(println)

    sc.stop()
  }

}


不重复执行

rdd对象的持久化操作不一定是为了重复使用,在数据执行较长,或数据比较重要的场合也可以采用持久化操作

RDD CheckPoint检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点 之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

package persist

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

object Persist2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setCheckpointDir("cp")

    val list=List("Hello Scala","Hello Spark")

    val rdd=sc.makeRDD(list)

    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map(
      data=>{
        println("@@@@@@@@@@@@@@@")
        (data,1)
      }
    )
    mapRDD.checkpoint()
    //checkpoint 需要添加落盘,需要指定检查点保存的路径
    //检查点路劲保存的文件,当作业执行完毕后,不会被删除
    //一般保存路径都是在分布式存储系统:HDFS


    val reduceRDD = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)

    println("*********************")

    val groupRDD = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)
    sc.stop()
  }
}

缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。 会在血缘关系中添加新的依赖,一旦出现问题可以重新读取数据

2)persist:将数据临时存储在磁盘文件中进行数据重用。涉及到磁盘IO,性能较低,但是数据安全。如果作业执行完毕,临时保存的数据文件就会丢失

3)checkpoint:将数据长久的保存在磁盘文件中进行数据重用。涉及到磁盘IO,性能较低,但是数据安全。为了保证数据安全,所以一般情况下,会独立执行作业。为了能提高效率,一般情况下,联合cache联合使用。执行过程当中,会切断血缘关系,重新建立新的血缘关系,checkpoint等同于改变数据源

RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认 分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分 区,进而决定了 Reduce 的个数。 ➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None ➢ 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

package part

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Part1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd=sc.makeRDD(List(
      ("nba","xxxxxxx"),
      ("cba","xxxxxxx"),
      ("wnba","xxxxxxx"),
      ("nba","xxxxxxx"),
    ),3)

    val parRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)

    parRDD.saveAsTextFile("output")

    sc.stop()



  }

}


class MyPartitioner extends Partitioner{

  //分区数量
  override def numPartitions: Int = 3

  //根据数据的key值 返回数据的分区索引(从0开始)
  override def getPartition(key: Any): Int = {
    if(key=="nba"){
      0
    }else if(key=="wnba"){
      1
    }else if(key=="cba"){
      2
    }else{
      2
    }

  }
}

文件读取与保存

sequence 文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass]。

object 对象文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T: ClassTag]函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用 saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext =new SparkContext(sparkConf)

    val rdd: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("b",2),("c",3)))


    rdd.saveAsTextFile("output1")
    rdd.saveAsObjectFile("output2")
    rdd.saveAsSequenceFile("output3")

    val rdd1:RDD[String]=sc.textFile("output1")
    val rdd2:RDD[(String,Int)]=sc.objectFile("output2")
    val rdd3:RDD[(String,Int)]=sc.sequenceFile("output3")

    println(rdd1.collect().mkString(","))
    println(rdd2.collect().mkString(","))
    println(rdd3.collect().mkString(","))
    
    sc.stop()
  }
累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    var sum: Int = 0;
    rdd.foreach(num => {
      sum = num + sum
    })

    println(sum)

    sc.stop()
  }

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    //获取累加器
    //Spark默认提供简单数据聚合的累加器

    val sumAcc: LongAccumulator = sc.longAccumulator("sum")

    rdd.foreach(
      num=>{
        //使用累加器
        sumAcc.add(num)
      }
    )
    println(sumAcc.value)

    sc.stop()
  }

多加和少加

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

    //获取累加器
    //Spark默认提供简单数据聚合的累加器

    val sumAcc: LongAccumulator = sc.longAccumulator("sum")

//    rdd.foreach(
//      num=>{
//        //使用累加器
//        sumAcc.add(num)
//      }
//    )
//    println(sumAcc.value)

    val mapRDD=rdd.map(
      num=>{
        sumAcc.add(num)
      }
    )

    //少加:转换算子中调用累加器,如果没有行动算子的话,不会执行
    println(sumAcc.value)
    //多加:转换算子中调用累加器,如果多次调用行动算子,会重复计算
    mapRDD.collect()
    mapRDD.collect()
    println(sumAcc.value)

    sc.stop()
  }

自定义累加器
object WordCount4 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc=new SparkContext(conf)

    val rdd=sc.makeRDD(List("hello","spark","hello"))

    //创建累加器对象
    val wcAcc=new MyAccumulator
    //向Spark进行注册
    sc.register(wcAcc,"wordCountAcc")

    rdd.foreach(
      word=>{
        //数据累加
        wcAcc.add(word)
      }
    )

    //获取累加的结果
    println(wcAcc.value)

    sc.stop()


  }

}

class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{

  private var wcMap: mutable.Map[String, Long] =mutable.Map[String,Long]()

  //判断是否初始状态
  override def isZero: Boolean = {
    wcMap.isEmpty
  }

  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new MyAccumulator()
  }

  //重置累加器
  override def reset(): Unit = wcMap.clear()

  //获取累加器需要计算的值
  override def add(v: String): Unit = {

    val newCnt=wcMap.getOrElse(v,0L)+1
    wcMap.update(v,newCnt)

  }

  //Driver合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map1: mutable.Map[String, Long] =this.wcMap
    val map2=other.value

    map2.foreach{
      case (word,count)=>{
        val newCount: Long =map1.getOrElse(word,0L)+count
        map1.update(word,newCount)
      }
    }


  }

  override def value: mutable.Map[String, Long] = wcMap
}
广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个 或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务 分别发送。

 def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd1: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd2: RDD[(String, Int)] =sc.makeRDD(List(("a",4),("b",5),("c",6)))

    //join会导致数据量的集合增长,并且会影响shuffle的性能,不推荐使用
//    val joinRDD: RDD[(String, (Int, Int))] =rdd1.join(rdd2)
//    joinRDD.collect().foreach(println)

    val map: mutable.Map[String, Int] =mutable.Map(("a",4),("b",5),("c",6))
    rdd1.map{
      case (w,c)=>{
        val l: Int = map.getOrElse(w, 0)
        (w,(c,l))
      }
    }.collect().foreach(println)

    //封装广播变量
    val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    rdd2.map{
      case (w,c)=>
        val l: Int = bc.value.getOrElse(w, 0)
        (w,(c,l))
    }.collect().foreach(println)
    sc.stop()
  }
热门商品
package rep

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HotCategoryTop10Analysis2 {

  def main(args: Array[String]): Unit = {
    //TODO Top10热门商品
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategory")

    val sc: SparkContext = new SparkContext(sparkConf)

    //1、读取原始数据
    val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    //2将数据转换结构
    //点击的场合:(品类id,(1,0,0))
    //下单的场合:(品类id,(0,1,0))
    //支付的场合:(品类id,(0,0,1))
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {
      val datas: Array[String] = action.split("_")
      if (datas(6) != "-1") {
        //点击的场合
        List((datas(6), (1, 0, 0)))
      } else if (datas(8) != "null") {
        //下单的场合
        val ids: Array[String] = datas(8).split("_")
        ids.map(id => (id, (0, 1, 0)))
      } else if (datas(10) != "null") {
        //支付的场合
        val ids = datas(10).split("_")
        ids.map(id => (id, (0, 0, 1)))

      } else {
        Nil
      }
    })

    //3、将相同的品类id的数据进行分组聚合
    // (品类id,(点击数量,下单数量,支付数量))
    val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
      (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
    )


    //4、将统计结果根据数量进行降序处理,取前10


    val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)

    //6、将结果采集到控制台打印
    resultRDD.foreach(println)
    sc.stop()

  }

}
package rep

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HotCategoryTop10Analysis {

  def main(args: Array[String]): Unit = {
    //TODO Top10热门商品
    val sparkConf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("HotCategory")

    val sc: SparkContext =new SparkContext(sparkConf)

    //1、读取原始数据
    val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    //2、统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD: RDD[String] = actionRDD.filter(
      action => {
        val datas: Array[String] = action.split("_")
        datas(6) != "-1"
      }
    )

    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas: Array[String] = action.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    //3、统计品类的下单数量:(品类ID,下单数据)
    val orderActionRDD: RDD[String] = actionRDD.filter(
      action => {
        val datas: Array[String] = action.split("_")
        datas(8) != "null"
      }
    )


    val orderCountRDD: RDD[(String, Int)] = orderActionRDD.flatMap(
      action => {
        val datas: Array[String] = action.split("_")
        val cid: String = datas(8)
        val cids: Array[String] = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    //4、统计品类的支付数据:(品类id,支付数据)
    val payActionRDD=actionRDD.filter(
      action=>{
        val datas: Array[String] =action.split("_")
        datas(10)!="null"
      }
    )

    val payCountRDD=payActionRDD.flatMap(
      action=>{
        val datas: Array[String] =action.split("_")
        val cid: String =datas(10)
        val cids: Array[String] =cid.split(",")
        cids.map(id=>(id,1))
      }
    ).reduceByKey(_+_)

    //5、将品类排序,并取前10名
    //点击数量 ,下单数量,支付数量
    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
    clickCountRDD.cogroup(orderCountRDD, payCountRDD)

    val analysisRDD: RDD[(String, (Int, Int, Int))] = cogroupRDD.mapValues({
      case (clickIter, orderIter, payIter) => {
        var clickCnt: Int = 0
        val iter1: Iterator[Int] = clickIter.iterator
        if (iter1.hasNext) {
          clickCnt = iter1.next()
        }

        var orderCnt= 0
        val iter2: Iterator[Int] = orderIter.iterator
        if (iter2.hasNext) orderCnt = iter2.next()

        var payCnt: Int = 0
        val iter3: Iterator[Int] = payIter.iterator
        if (iter3.hasNext) payCnt = iter3.next()
        (clickCnt, orderCnt, payCnt)
      }
    })

    val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)

    //6、将结果采集到控制台打印
    resultRDD.foreach(println)
    sc.stop()

  }

}
  def main(args: Array[String]): Unit = {
    //TODO Top10热门商品
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategory")

    val sc: SparkContext = new SparkContext(sparkConf)

    //1、读取原始数据
    val actionRDD: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    //2将数据转换结构
    //点击的场合:(品类id,(1,0,0))
    //下单的场合:(品类id,(0,1,0))
    //支付的场合:(品类id,(0,0,1))
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {
      val datas: Array[String] = action.split("_")
      if (datas(6) != "-1") {
        //点击的场合
        List((datas(6), (1, 0, 0)))
      } else if (datas(8) != "null") {
        //下单的场合
        val ids: Array[String] = datas(8).split("_")
        ids.map(id => (id, (0, 1, 0)))
      } else if (datas(10) != "null") {
        //支付的场合
        val ids = datas(10).split("_")
        ids.map(id => (id, (0, 0, 1)))

      } else {
        Nil
      }
    })

    //3、将相同的品类id的数据进行分组聚合
    // (品类id,(点击数量,下单数量,支付数量))
    val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
      (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
    )


    //4、将统计结果根据数量进行降序处理,取前10


    val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)

    //6、将结果采集到控制台打印
    resultRDD.foreach(println)
    sc.stop()

  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/329320.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号