栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Shuffle FetchFailedException解决方案

Spark Shuffle FetchFailedException解决方案

某日遇到一个数据倾斜的SQL, 首先想到的方法就是加大Partition 看看数据hash 之后会不会落得 均匀,所以就将spark.sql.shuffle.partitions从原来的500 加大到2700 .
结果反而失败了, 错误如下:

FetchFailed(BlockManagerId(516, nfjd-hadoop02-node352.jpushoa.com, 7337, None), shuffleId=3, mapId=59, reduceId=917, message=
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:941)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:811)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:326)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (usd: 2147483648, max: 2147483648)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
	... 1 more

可以看出是 shuffle 阶段 fetch 数据 导致的内存溢出
一开始我拿到这个错误的时候有点蒙蔽了,按我的理解我加大了shuffle partitions 每个task的数据量应该是有所减少才对的,(而且在spark web ui 上也是如此体现.),那为什么数据量小了反而会在Shuffle Read 阶段Fetch Fail呢?
所以就有了以下一系列的原理性排查:

Spark Shuffle Read 是如何进行的

Spark Shuffle Read 是发生在 Reduce Task 阶段的, 由ReduceTask 所在的executor 需要去Driver 获取Shuffle数据(包括数据大小), 在哪个executor 上.并且去Fetch ,这里有一个机制就是:
当要fetch 的block size 比较大的时候使用的是stream的方式流取到本地磁盘不会一次性加载到内存.防止了内存溢出. 但当block size小于某个阈值时则将整个Block读到内存进行排序等操作. 那这个阈值是多少呢?

就是spark.maxRemoteBlockSizeFetchToMem 默认的大小是: Int.MaxValue - 512
大概就是2G这样

. 源码如下:

//在BlockManager 初始化的时候就会将配置中的  spark.maxRemoteBlockSizeFetchToMem  设置进成员变量private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

/** * Get block from remote block managers as serialized bytes. */

def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {  

 logDebug(s"Getting remote block $blockId")  
 require(blockId != null, "BlockId is null")  
 var runningFailureCount = 0  
 var totalFailureCount = 0   
 
 // 这里通过BlockManager 去与driver的BlockManager 进行调用获取block信息. 
 val locationsAndStatus = master.getLocationsAndStatus(blockId)  
 
 val blockSize = locationsAndStatus.map { b =>    b.status.diskSize.max(b.status.memSize)  }.getOrElse(0L)  
 val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)  
 
 
//这里就是 获取配置 spark.maxRemoteBlockSizeFetchToMem 的值 然后判断block 是否是需要使用流方式spill读取,还是直接读进内存
val tempFileManager = if (blockSize > maxRemoteBlockToMem) {   
remoteBlockTempFileManager 
} else {
null  
}

这么看来好像是这个数据设置的太大了, 假设我的多个Block数据都是1G多的那么多几个 那肯定吃不消,是会内存溢出的. 接下来就把这个值spark.maxRemoteBlockSizeFetchToMem 设置成10m ,使其都进行流的方式读取到磁盘这样就不会导致错误了. em…很不幸,不成功,依旧是失败的…一样的错误, 整个人裂开了…(卡了一天)

继续开始想 ,这里很迷惑,为什么用流的方式读取还是错误呢? 经过了不断地翻阅 , 把问题锁定在了block size 这里了,因为只有这里会有不一样,所以通过阅读当前执行计划中的使用sortmerge join中的SortShuffleWriter.scala中写数据并且记录block size大小的源码:

override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)

      "这里需要去创建一个mapStatus 去返回当前map完的数据输出地址,以及大小等信息"
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

从上面的 MapStatus跳进来:

private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
    .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
    .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

这里根据一个参数:SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS(对应配置:

spark.shuffle.minNumPartitionsToHighlyCompress 就是这里!设置了一个2000 的阈值)

根据 shuffle partitions 的数量来判断是要使用HighlyCompressedMapStatus
还是CompressedMapStatus

HighlyCompressedMapStatus: 从名字就可以得知是一个高压缩的MapStatus, 适用于2000以上的shuffle partitions的情况,所以只会记录一个平均值的block size,还有某些超过一定阈值大小的block size 配置:spark.shuffle.accurateBlockThreshold
CompressedMapStatus : 这个是会记录所有block size的MapStatus
谜底揭开:原来没有想到的是,在超过shuffle parititon 2000 的情况下尽然会有这样的变化

结论:

Spark 在处理 shuffle partition >2000 的时候为了优化起见并不会记录所有Map阶段产生的Block
大小而是会转而使用HighlyCompressedMapStatus记录.
由参数spark.shuffle.minNumPartitionsToHighlyCompress(默认2000)控制

HighlyCompressedMapStatus 内部也会记录部分超过阈值的block size
,由参数:spark.shuffle.accurateBlockThreshold(默认 100 * 1024 * 1024 B)配置

shuffle read 的时候会根据读取的block size 判断是否要是要使用流读取还是一次性加载到内存 . 由参数:
spark.maxRemoteBlockSizeFetchToMem(默认 Int.MaxValue - 512)

解决办法

知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。

减少shuffle数据

思考是否可以使用map side join或是broadcast join来规避shuffle的产生。

将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

SparkSQL和Dataframe的join,group by等操作

通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。

Rdd的join,groupBy,reduceByKey等操作

通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

提高executor的内存

通过spark.executor.memory适当提高executor的memory值。

是否存在数据倾斜的问题

空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则

————————————————
版权声明:本文为CSDN博主「JaxMa」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Jaxma/article/details/106827482

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762390.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号