Wednesday, November 12, 2014

ShuffleError By 'java.lang.OutOfMemoryError: Java heap space' When Running MapReduce Application

When executing a MapReduce application, a ShuffleError is thrown:
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#50 at 
org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at 
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at 
org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.lang.OutOfMemoryError: Java heap space at 
org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) at 
org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) at 
org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) at 
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) at 
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at 
org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

In Shuffle.run (code snippet is shown as below),  a set of Fetcher threads will be launched and do the shuffle work, the number of which is decided by 'mapreduce.reduce.shuffle.parallelcopies' argument in mapred-site.xml:
    // Start the map-output fetcher threads
    final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
                                     reporter, metrics, this,
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }

    // Wait for shuffle to complete successfully
    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
      reporter.progress();
  
      synchronized (this) {
        if (throwable != null) {
          throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                 throwable);
        }
      }
    }

From the bottom of the stacktrace, we can see that OOM is thrown starting from  Fetcher.copyFromHost(), Fetcher.copyMapOutput(), to MergeManagerImpl.reserve(). In MergeManagerImpl.reserve(), we can see that there are two ways for shuffle, on-disk and in-memory, respectively.
@Override
  public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
                                             long requestedSize,
                                             int fetcher
                                             ) throws IOException {
    if (!canShuffleToMemory(requestedSize)) {
      LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
               " is greater than maxSingleShuffleLimit (" +
               maxSingleShuffleLimit + ")");
      return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
                                      jobConf, mapOutputFile, fetcher, true);
    }

    if (usedMemory > memoryLimit) {
      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
          + ") is greater than memoryLimit (" + memoryLimit + ")." +
          " CommitMemory is (" + commitMemory + ")");
      return null;
    }

    // Allow the in-memory shuffle to progress
    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
        + "CommitMemory is (" + commitMemory + ")");
    return unconditionalReserve(mapId, requestedSize, true);
}

...

private boolean canShuffleToMemory(long requestedSize) {
    return (requestedSize < maxSingleShuffleLimit);

}

If we tracking down to where this 'maxSingleShuffleLimit' comes from, we could see from code snippet which is well-explained in comments:
//memoryLimit is configured by ('mapreduce.reduce.memory.mb'*'mapreduce.reduce.shuffle.input.buffer.percent')
//singleShuffleMemoryLimitPercent is configured by 'mapreduce.reduce.shuffle.memory.limit.percent'
this.maxSingleShuffleLimit =
    (long)(memoryLimit * singleShuffleMemoryLimitPercent);

Consequently, the product of 'mapreduce.reduce.shuffle.input.buffer.percent' multiplies by 'mapreduce.reduce.shuffle.parallelcopies' should not be greater than 1, or, in the extreme case, OOM will be thrown provided all mappers complete at the same time and all the outputs is in the manner of in-memory shuffling.

But here's a compromise between OOM and throughput. The product can be greater than 1 if some OOMs is tolerable.

P.S. The description of related arguments in this post is as follows:
name value description
mapreduce.reduce.shuffle.parallelcopies 5 The default number of parallel transfers run by reduce during the copy(shuffle) phase.
mapreduce.reduce.memory.mb 3072 Larger resource limit for reduces.
mapreduce.reduce.shuffle.input.buffer.percent 0.7 The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
References:

  1. hadoop-definitive-guide-3rd_shuffle-and-sort
  2. MapReduce Dataflow - YDN

© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

4 comments:

  1. I have seen similar issue and have following doubts.

    How ""'mapreduce.reduce.shuffle.input.buffer.percent' multiplies by 'mapreduce.reduce.shuffle.parallelcopies' should not be greater than 1 "" can cause OOM ..? And

    ""the extreme case, OOM will be thrown provided all mappers complete at the same time and all the outputs is in the manner of in-memory shuffling."" which case..?

    why can't we keep 'mapreduce.reduce.shuffle.input.buffer.percent'=0.2 by default as per above formula..?

    ReplyDelete
  2. To respond at the above comment I think the author missed the "mapreduce.reduce.shuffle.memory.limit.percent" variable which is 0.25 by default.
    So the calculation would be mapreduce.reduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.reduce.shuffle.parallelcopies < 1.

    See also the original issue : https://issues.apache.org/jira/browse/MAPREDUCE-6447

    ReplyDelete
    Replies
    1. the commenter has not read the code that uses mapreduce.reduce.shuffle.memory.limit.percent (like most of the commentators of the original issue) - it only effects weather or not to use RAM memory or disk for shuffling. The writer of the post was correct in not entering that configuration flag to the formula.

      Delete
    2. I agree with Bogdan that author might missed a variable. From what I understand each request to reserve memory is tested against maxSingleShuffleLimit, which is mapreduce.reduce.memory.mb * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent. If the requestedSize is greater than maxSingleShuffleLimit shuffle is done on the disk, so each shuffle done on memory cannot exceed maxSingleShuffleLimit. number of threads doing the shuffle is mapreduce.reduce.shuffle.parallelcopies, so assuming that all shuffle works are done on memory max memory it consumes should be less than maxSingleShuffleLimit * mapreduce.reduce.shuffle.parallelcopies, which is mapreduce.reduce.memory.mb * mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.parallelcopies
      It should not exceed the memory used by the reducer, so mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.parallelcopies should be less than 1.

      Delete