某日遇到一个数据倾斜的SQL, 首先想到的方法就是加大Partition 看看数据hash 之后会不会落得 均匀,所以就将spark.sql.shuffle.partitions从原来的500 加大到2700 .
结果反而失败了, 错误如下:
FetchFailed(BlockManagerId(516, nfjd-hadoop02-node352.jpushoa.com, 7337, None), shuffleId=3, mapId=59, reduceId=917, message= org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:941) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:811) at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:326) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (usd: 2147483648, max: 2147483648) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) at io.netty.buffer.PoolArena.allocate(PoolArena.java:227) at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342) ... 1 more
可以看出是 shuffle 阶段 fetch 数据 导致的内存溢出
一开始我拿到这个错误的时候有点蒙蔽了,按我的理解我加大了shuffle partitions 每个task的数据量应该是有所减少才对的,(而且在spark web ui 上也是如此体现.),那为什么数据量小了反而会在Shuffle Read 阶段Fetch Fail呢?
所以就有了以下一系列的原理性排查:
Spark Shuffle Read 是发生在 Reduce Task 阶段的, 由ReduceTask 所在的executor 需要去Driver 获取Shuffle数据(包括数据大小), 在哪个executor 上.并且去Fetch ,这里有一个机制就是:
当要fetch 的block size 比较大的时候使用的是stream的方式流取到本地磁盘不会一次性加载到内存.防止了内存溢出. 但当block size小于某个阈值时则将整个Block读到内存进行排序等操作. 那这个阈值是多少呢?
就是spark.maxRemoteBlockSizeFetchToMem 默认的大小是: Int.MaxValue - 512
大概就是2G这样
. 源码如下:
//在BlockManager 初始化的时候就会将配置中的 spark.maxRemoteBlockSizeFetchToMem 设置进成员变量private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
/** * Get block from remote block managers as serialized bytes. */
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
// 这里通过BlockManager 去与driver的BlockManager 进行调用获取block信息.
val locationsAndStatus = master.getLocationsAndStatus(blockId)
val blockSize = locationsAndStatus.map { b => b.status.diskSize.max(b.status.memSize) }.getOrElse(0L)
val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
//这里就是 获取配置 spark.maxRemoteBlockSizeFetchToMem 的值 然后判断block 是否是需要使用流方式spill读取,还是直接读进内存
val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
remoteBlockTempFileManager
} else {
null
}
这么看来好像是这个数据设置的太大了, 假设我的多个Block数据都是1G多的那么多几个 那肯定吃不消,是会内存溢出的. 接下来就把这个值spark.maxRemoteBlockSizeFetchToMem 设置成10m ,使其都进行流的方式读取到磁盘这样就不会导致错误了. em…很不幸,不成功,依旧是失败的…一样的错误, 整个人裂开了…(卡了一天)
继续开始想 ,这里很迷惑,为什么用流的方式读取还是错误呢? 经过了不断地翻阅 , 把问题锁定在了block size 这里了,因为只有这里会有不一样,所以通过阅读当前执行计划中的使用sortmerge join中的SortShuffleWriter.scala中写数据并且记录block size大小的源码:
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
"这里需要去创建一个mapStatus 去返回当前map完的数据输出地址,以及大小等信息"
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
从上面的 MapStatus跳进来:
private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
这里根据一个参数:SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS(对应配置:
spark.shuffle.minNumPartitionsToHighlyCompress 就是这里!设置了一个2000 的阈值)
根据 shuffle partitions 的数量来判断是要使用HighlyCompressedMapStatus
还是CompressedMapStatus
HighlyCompressedMapStatus: 从名字就可以得知是一个高压缩的MapStatus, 适用于2000以上的shuffle partitions的情况,所以只会记录一个平均值的block size,还有某些超过一定阈值大小的block size 配置:spark.shuffle.accurateBlockThreshold
CompressedMapStatus : 这个是会记录所有block size的MapStatus
谜底揭开:原来没有想到的是,在超过shuffle parititon 2000 的情况下尽然会有这样的变化
结论:
Spark 在处理 shuffle partition >2000 的时候为了优化起见并不会记录所有Map阶段产生的Block
大小而是会转而使用HighlyCompressedMapStatus记录.
由参数spark.shuffle.minNumPartitionsToHighlyCompress(默认2000)控制HighlyCompressedMapStatus 内部也会记录部分超过阈值的block size
,由参数:spark.shuffle.accurateBlockThreshold(默认 100 * 1024 * 1024 B)配置shuffle read 的时候会根据读取的block size 判断是否要是要使用流读取还是一次性加载到内存 . 由参数:
spark.maxRemoteBlockSizeFetchToMem(默认 Int.MaxValue - 512)
解决办法
知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。
减少shuffle数据
思考是否可以使用map side join或是broadcast join来规避shuffle的产生。
将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
SparkSQL和Dataframe的join,group by等操作
通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。
Rdd的join,groupBy,reduceByKey等操作
通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。
提高executor的内存
通过spark.executor.memory适当提高executor的memory值。
是否存在数据倾斜的问题
空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则
————————————————
版权声明:本文为CSDN博主「JaxMa」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Jaxma/article/details/106827482



