栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021.12.8 RDD持久化:Spark Cache缓存、CheckPoint检查点,RDD共享变量:累加器、广播变量

2021.12.8 RDD持久化:Spark Cache缓存、CheckPoint检查点,RDD共享变量:累加器、广播变量

目录

 Cache缓存

 CheckPoint检查点

 累加器、广播变量


 

 Cache缓存

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)


    val rdd1:RDD[String]=sc.textFile("in/users.csv").cache()
    var start:Long=System.currentTimeMillis()
    println(rdd1.count())
    var end:Long=System.currentTimeMillis()
    println("count操作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第二次count操作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第三次count操作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    rdd1.unpersist()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第四次count操作花费的时间:"+(end-start)+"毫秒")





  }

}

 CheckPoint检查点

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckPointDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)


    sc.setCheckpointDir("file:///D://KB15checkpoint")
//    val rdd:RDD[(String,Int)]=sc.parallelize(Array(("a",1),("b",2),("c",3)))
    val rdd=sc.textFile("in/users.csv")
    rdd.checkpoint()
    println(rdd.count()) //行动算子
    println("是否是checkpoint"+rdd.isCheckpointed)
    println("检查点文件"+rdd.getCheckpointFile)

    println("--------------------------------------------------")

//    val rdd2:RDD[String]=sc.textFile("file:/D:\KB15checkpoint\1067856c-a458-4e63-bde8-68e1dc10ea80\rdd-0")
    val rdd2:RDD[String]=sc.textFile(rdd.getCheckpointFile.get)
    println(rdd2.count())
    rdd2.collect().foreach(println)




  }

}

 累加器、广播变量

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object BroadCastDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)  //

    val myAccumul: Accumulator[Int] = sc.accumulator(0,"MyAccumul")

    var mycount=1

    sc.parallelize(Array(1,2,3,4)).foreach(
      x=>{
        println("x:" + x)
        println("mycount: " +mycount)
        mycount += 1
        println("myAccumul: "+myAccumul)
        myAccumul += x
      }
    )

    println(mycount,myAccumul)


    
    //    val arr=Array("hello","hi","come on baby")
//    val broadCastVar:Broadcast[Array[String]]=sc.broadcast(arr)  //不要广播太大的东西  提前准备好东西
//
//    val hi="how are you"
//
//    val rdd=sc.parallelize(Array((1,"teacher"),(2,"worker"),(3,"teamleader")))
//
//    val rdd2: RDD[(Int, String)] = rdd.mapValues(x => {
//      println("value is :" + x)
      hi + ":" + x  //自己携带资料去
//      broadCastVar.value(2) +":" + x   //工作资料已经提前准备好
//    })
//
//    rdd2.collect().foreach(println)

  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650922.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号