When executing a MapReduce application, a ShuffleError is thrown:
In Shuffle.run (code snippet is shown as below), a set of Fetcher threads will be launched and do the shuffle work, the number of which is decided by 'mapreduce.reduce.shuffle.parallelcopies' argument in mapred-site.xml:
From the bottom of the stacktrace, we can see that OOM is thrown starting from Fetcher.copyFromHost(), Fetcher.copyMapOutput(), to MergeManagerImpl.reserve(). In MergeManagerImpl.reserve(), we can see that there are two ways for shuffle, on-disk and in-memory, respectively.
If we tracking down to where this 'maxSingleShuffleLimit' comes from, we could see from code snippet which is well-explained in comments:
Consequently, the product of 'mapreduce.reduce.shuffle.input.buffer.percent' multiplies by 'mapreduce.reduce.shuffle.parallelcopies' should not be greater than 1, or, in the extreme case, OOM will be thrown provided all mappers complete at the same time and all the outputs is in the manner of in-memory shuffling.
But here's a compromise between OOM and throughput. The product can be greater than 1 if some OOMs is tolerable.
P.S. The description of related arguments in this post is as follows:
References:
© 2014-2017 jason4zhu.blogspot.com All Rights Reserved
If transfering, please annotate the origin: Jason4Zhu
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#50 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
In Shuffle.run (code snippet is shown as below), a set of Fetcher threads will be launched and do the shuffle work, the number of which is decided by 'mapreduce.reduce.shuffle.parallelcopies' argument in mapred-site.xml:
// Start the map-output fetcher threads
final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
From the bottom of the stacktrace, we can see that OOM is thrown starting from Fetcher.copyFromHost(), Fetcher.copyMapOutput(), to MergeManagerImpl.reserve(). In MergeManagerImpl.reserve(), we can see that there are two ways for shuffle, on-disk and in-memory, respectively.
@Override public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { if (!canShuffleToMemory(requestedSize)) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, jobConf, mapOutputFile, fetcher, true); } if (usedMemory > memoryLimit) { LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + ") is greater than memoryLimit (" + memoryLimit + ")." + " CommitMemory is (" + commitMemory + ")"); return null; } // Allow the in-memory shuffle to progress LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + "CommitMemory is (" + commitMemory + ")"); return unconditionalReserve(mapId, requestedSize, true); } ... private boolean canShuffleToMemory(long requestedSize) { return (requestedSize < maxSingleShuffleLimit); }
If we tracking down to where this 'maxSingleShuffleLimit' comes from, we could see from code snippet which is well-explained in comments:
//memoryLimit is configured by ('mapreduce.reduce.memory.mb'*'mapreduce.reduce.shuffle.input.buffer.percent') //singleShuffleMemoryLimitPercent is configured by 'mapreduce.reduce.shuffle.memory.limit.percent' this.maxSingleShuffleLimit = (long)(memoryLimit * singleShuffleMemoryLimitPercent);
Consequently, the product of 'mapreduce.reduce.shuffle.input.buffer.percent' multiplies by 'mapreduce.reduce.shuffle.parallelcopies' should not be greater than 1, or, in the extreme case, OOM will be thrown provided all mappers complete at the same time and all the outputs is in the manner of in-memory shuffling.
But here's a compromise between OOM and throughput. The product can be greater than 1 if some OOMs is tolerable.
P.S. The description of related arguments in this post is as follows:
name | value | description |
mapreduce.reduce.shuffle.parallelcopies | 5 | The default number of parallel transfers run by reduce during the copy(shuffle) phase. |
mapreduce.reduce.memory.mb | 3072 | Larger resource limit for reduces. |
mapreduce.reduce.shuffle.input.buffer.percent | 0.7 | The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. |
© 2014-2017 jason4zhu.blogspot.com All Rights Reserved
If transfering, please annotate the origin: Jason4Zhu
I have seen similar issue and have following doubts.
ReplyDeleteHow ""'mapreduce.reduce.shuffle.input.buffer.percent' multiplies by 'mapreduce.reduce.shuffle.parallelcopies' should not be greater than 1 "" can cause OOM ..? And
""the extreme case, OOM will be thrown provided all mappers complete at the same time and all the outputs is in the manner of in-memory shuffling."" which case..?
why can't we keep 'mapreduce.reduce.shuffle.input.buffer.percent'=0.2 by default as per above formula..?
To respond at the above comment I think the author missed the "mapreduce.reduce.shuffle.memory.limit.percent" variable which is 0.25 by default.
ReplyDeleteSo the calculation would be mapreduce.reduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.reduce.shuffle.parallelcopies < 1.
See also the original issue : https://issues.apache.org/jira/browse/MAPREDUCE-6447
the commenter has not read the code that uses mapreduce.reduce.shuffle.memory.limit.percent (like most of the commentators of the original issue) - it only effects weather or not to use RAM memory or disk for shuffling. The writer of the post was correct in not entering that configuration flag to the formula.
DeleteI agree with Bogdan that author might missed a variable. From what I understand each request to reserve memory is tested against maxSingleShuffleLimit, which is mapreduce.reduce.memory.mb * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent. If the requestedSize is greater than maxSingleShuffleLimit shuffle is done on the disk, so each shuffle done on memory cannot exceed maxSingleShuffleLimit. number of threads doing the shuffle is mapreduce.reduce.shuffle.parallelcopies, so assuming that all shuffle works are done on memory max memory it consumes should be less than maxSingleShuffleLimit * mapreduce.reduce.shuffle.parallelcopies, which is mapreduce.reduce.memory.mb * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.parallelcopies
DeleteIt should not exceed the memory used by the reducer, so mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.parallelcopies should be less than 1.