栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他 > Spark

Spark刷爆磁盘与Java弱引用的关系

Spark 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

 





 

一 引用基本概念





 

如下面,定义两个变量num,str,存储模型大致如下图:

  1. int num = 6; String str = “浪尖聊大数据”; 







 

变量num值直接从6修改为了8;变量str只是修改了其保存的地址,从0x88修改为0x86,对象 “浪尖聊大数据 ”本身还在内存中,并没有被修改。只是内存中新增了对象 “浪尖是帅哥”。

二 值传递&引用传递



 

举例说明引用传递和值传递:

  1. 第一个栗子:基本类型 void foo(int value) { 
  2.     value = 88; } 
  3. foo(num); // num 没有被改变  
  4. 第二个栗子:没有提供改变自身方法的引用类型 void foo(String text) { 
  5.     text = "mac"; } 
  6. foo(str); // str 也没有被改变  
  7. 第三个栗子:提供了改变自身方法的引用类型 StringBuilder sb = new StringBuilder("vivo"); 
  8. void foo(StringBuilder builder) {     builder.append("5"); 
  9. } foo(sb); // sb 被改变了,变成了"vivo5"。 
  10.  第四个栗子:提供了改变自身方法的引用类型,但是不使用,而是使用赋值运算符。 
  11. StringBuilder sb = new StringBuilder("oppo"); void foo(StringBuilder builder) { 
  12.     builder = new StringBuilder("vivo"); } 
  13. foo(sb); // sb 没有被改变,还是 "oppo"。 

三 引用的类型









  1. 单纯的申明一个软引用,指向一个person对象 1 SoftReference pSoftReference=new SoftReference(new Person(“张三”,12)); 
  2.  声明一个引用队列 
  3. ReferenceQueue queue = new ReferenceQueue<>();  
  4. 声明一个person对象,李四,obj是其强引用 Person obj = new Person(“李四”,13); 
  5.  使软引用softRef指向李四对应的对象,并且将该软引用关联到引用队列 
  6. 2 SoftReference softRef = new SoftReference(obj,queue);  
  7. 声明一个person对象,名叫王酒,并保证其仅含软引用,且将软引用关联到引用队列queue 3 SoftReference softRef = new SoftReference(new Person(“王酒”,15),queue); 
  8.  使用很简单softRef.get即可获取对应的value。 



    1. WeakReference weakReference = new WeakReference<>(new Person(“浪尖”,18));  
    2. 声明一个引用队列 ReferenceQueue queue = new ReferenceQueue<>(); 
    3.  声明一个person对象,李四,obj是其强引用 
    4. Person obj = new Person(“李四”,13);  
    5. 声明一个弱引用,指向强引用obj所指向的对象,同时该引用绑定到引用队列queue。 WeakReference weakRef = new WeakReference(obj,queue); 
    6.  使用弱引用也很简单,weakRef.get 



      1. 声明引用队列 ReferenceQueue queue = new ReferenceQueue(); 
      2.  声明一个虚引用 
      3. PhantomReference reference = new PhantomReference(new Person(“浪尖”,18), queue);  
      4. 获取虚引用的值,直接为null,因为无法通过虚引用获取引用对象。 System.out.println(reference.get()); 



       

       

       


       





       

      四 Threadlocal如何使用弱引用


       





       

      五 spark如何使用弱引用进行数据清理

       

       


       



       

      shuffle相关的引用,实际上是在ShuffleDependency内部实现了,shuffle状态注册到ContextCleaner过程:

      1. _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) 

      然后,我们翻开registerShuffleForCleanup函数源码可以看到,注释的大致意思是注册ShuffleDependency目的是在垃圾回收的时候清除掉它对应的数据:

      1.    def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = { 
      2.     registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))   } 

      其中,registerForCleanup函数如下:

      1.    private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { 
      2.     referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))   } 

      referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,确保在引用队列没处理前,弱引用不会被垃圾回收。

      1.  
      2.   private val referenceBuffer =     Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) 

      ContextCleaner内部有一个线程,循环从引用队列里取被垃圾回收的RDD等相关弱引用,然后完成对应的数据清除工作。

      1. private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() } 

      其中,keepCleaning函数,如下:

      1.    private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { 
      2.     while (!stopped) {       try { 
      3.         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))           .map(_.asInstanceOf[CleanupTaskWeakReference]) 
      4.         // Synchronize here to avoid being interrupted on stop()         synchronized { 
      5.           reference.foreach { ref =>             logDebug("Got cleaning task " + ref.task) 
      6.             referenceBuffer.remove(ref)             ref.task match { 
      7.               case CleanRDD(rddId) =>                 doCleanupRDD(rddId, blocking = blockOnCleanupTasks) 
      8.               case CleanShuffle(shuffleId) =>                 doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) 
      9.               case CleanBroadcast(broadcastId) =>                 doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) 
      10.               case CleanAccum(accId) =>                 doCleanupAccum(accId, blocking = blockOnCleanupTasks) 
      11.               case CleanCheckpoint(rddId) =>                 doCleanCheckpoint(rddId) 
      12.             }           } 
      13.         }       } catch { 
      14.         case ie: InterruptedException if stopped => // ignore         case e: Exception => logError("Error in cleaning thread", e) 
      15.       }     } 
      16.   } 

      shuffle数据清除的函数是doCleanupShuffle,具体内容如下:

      1.    def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { 
      2.     try {       logDebug("Cleaning shuffle " + shuffleId) 
      3.       mapOutputTrackerMaster.unregisterShuffle(shuffleId)       shuffleDriverComponents.removeShuffle(shuffleId, blocking) 
      4.       listeners.asScala.foreach(_.shuffleCleaned(shuffleId))       logDebug("Cleaned shuffle " + shuffleId) 
      5.     } catch {       case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) 
      6.     }   } 

      细节就不细展开了。



       

      ContextCleaner的start函数被调用后,实际上启动了一个调度线程,每隔30min主动调用了一次System.gc(),来触发垃圾回收。

      1.    def start(): Unit = { 
      2.     cleaningThread.setDaemon(true)     cleaningThread.setName("Spark Context Cleaner") 
      3.     cleaningThread.start()     periodicGCService.scheduleAtFixedRate(() => System.gc(), 
      4.       periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)   } 

      具体参数是:

      1. spark.cleaner.periodicGC.interval 



      本文转载自微信公众号「浪尖聊大数据」,可以通过以下二维码关注。转载本文请联系浪尖聊大数据公众号。



       

       

      转载请注明:文章转载自 www.mshxw.com
      我们一直用心在做
      关于我们 文章归档 网站地图 联系我们

      版权所有 (c)2021-2022 MSHXW.COM

      ICP备案号:晋ICP备2021003244-6号