Wednesday, November 5, 2014

Fair Scheduler In YARN, Hadoop-2.2.0 - Deep Into Code

Series:

In this post, we'll look through the source code of Fair Scheduler, catch the key point and leave an impression of the procedure on how Fair Scheduler works.

Step 1: Create FairScheduler


Firstly, YARN will check at parameter 'yarn.resourcemanager.scheduler.class' to decide which scheduler will be applied.
<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

The relative code is at 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.createScheduler()'
protected ResourceScheduler createScheduler() {
  //Read paramter 'yarn.resourcemanager.scheduler.class' to get specific scheduler class name.
  String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
  YarnConfiguration.DEFAULT_RM_SCHEDULER);
  Class<?> schedulerClazz = Class.forName(schedulerClassName);
  return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
        this.conf);
  }

After creating FairScheduler, the FairScheduler.reinitialize() method will be invoked from 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceInit()'.
try {
  this.scheduler.reinitialize(conf, this.rmContext);
} catch (IOException ioe) {
  throw new RuntimeException("Failed to initialize scheduler", ioe);
}

Step 2: Main Logic Of FairScheduler


For FairScheduler.reinitialize():
@Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
    throws IOException {
  //Initialize parameters relative to FairScheduler in yarn-site.xml
  userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
  nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
  rackLocalityThreshold = this.conf.getLocalityThresholdRack();
  preemptionEnabled = this.conf.getPreemptionEnabled();
  preemptionInterval = this.conf.getPreemptionInterval();
  waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
  ……
 
  if (!initialized) {
    //Initialize QueueManager
    queueMgr.initialize();

    //Initialize UpdateThread
    Thread updateThread = new Thread(new UpdateThread());
    updateThread.setName("FairSchedulerUpdateThread");
    updateThread.setDaemon(true);
    updateThread.start();
  } else {
    //Reload Allocation file (fair-scheduler.xml)
    queueMgr.reloadAllocs();
  }
}

UpdateThread is a daemon thread, which will be invoked periodically.
/**
 * A runnable which calls {@link FairScheduler#update()} every
 * <code>UPDATE_INTERVAL</code> milliseconds.
 */
private class UpdateThread implements Runnable {
  public void run() {
    while (true) {
      try {
        Thread.sleep(UPDATE_INTERVAL);
        //Call FairScheduler.update()
        update();
        //The entry for preemtion
        preemptTasksIfNecessary();
      } catch (Exception e) {
        LOG.error("Exception in fair scheduler UpdateThread", e);
      }
    }
  }
}

Go deep into FairScheduler.update(), several things have done here, namely, reload Allocation File (fair-scheduler.xml), calculate the demand of queues as well as applications in each queue recursively, and calculate FairShare for each queue recursively.
/**
 * Recompute the internal variables used by the scheduler - per-job weights,
 * fair shares, deficits, minimum slot allocations, and amount of used and
 * required resources per job.
 */
protected synchronized void update() {
  queueMgr.reloadAllocsIfNecessary(); // Reload allocation file
  updateRunnability(); // Set job runnability based on user/queue limits
  updatePreemptionVariables(); // Determine if any queues merit preemption

  FSQueue rootQueue = queueMgr.getRootQueue();

  // Recursively update demands for all queues
  rootQueue.updateDemand();

  rootQueue.setFairShare(clusterCapacity);
  // Recursively compute fair shares for all queues
  // and update metrics
  rootQueue.recomputeShares();
}

Step 3: Main Logic Of QueueManager


QueueManager's function is to load and maintain all information on queues, which is configured in Allocation File (fair-scheduler.xml.

QueueManager.initialize() is as follows, in which, reloadAllocs() will invoke on QueueManager.loadQueue() recursively (support for hierarchical queue) in order to load all the queues.
public void initialize() throws IOException, SAXException,
    AllocationConfigurationException, ParserConfigurationException {
  FairSchedulerConfiguration conf = scheduler.getConf();
  // Create queue 'root'
  rootQueue = new FSParentQueue("root", this, scheduler, null);
  queues.put(rootQueue.getName(), rootQueue);
 
  this.allocFile = conf.getAllocationFile();
 
  // Load fair-scheduler.xml
  reloadAllocs();
  lastSuccessfulReload = scheduler.getClock().getTime();
  lastReloadAttempt = scheduler.getClock().getTime();

  // Create the default queue
  getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}

The leaf queue, non-leaf queue (which has one or more queues as child), and application is abstracted as FSLeafQueue, FSParentQueue, AppSchedulable, respectively, which all extend from 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable'. Obviously, this is composite design pattern.

For the above rootQueue.updateDemand() method, we can see from the code that updateDemand() is called from FSParentQueue, then FSLeafQueue, and finally down to Applications.
@Override
public void FSParentQueue.updateDemand() {
  // Compute demand by iterating through apps in the queue
  // Limit demand to maxResources
  Resource maxRes = queueMgr.getMaxResources(getName());
  demand = Resources.createResource(0);
  // Iterate through child queues, add whose demand as current queue's demand.
  for (FSQueue childQueue : childQueues) {
    childQueue.updateDemand();
    Resource toAdd = childQueue.getDemand();
    demand = Resources.add(demand, toAdd);
    demand = Resources.componentwiseMin(demand, maxRes);
    if (Resources.equals(demand, maxRes)) {
      break;
    }
  }
}

@Override
public void FSLeafQueue.updateDemand() {
  // Compute demand by iterating through apps in the queue
  // Limit demand to maxResources
  Resource maxRes = queueMgr.getMaxResources(getName());
  demand = Resources.createResource(0);
  // Iterate through applications, add whose demand as current queue's demand.
  for (AppSchedulable sched : appScheds) {
    sched.updateDemand();
    Resource toAdd = sched.getDemand();
    demand = Resources.add(demand, toAdd);
    demand = Resources.componentwiseMin(demand, maxRes);
    if (Resources.equals(demand, maxRes)) {
      break;
    }
  }
}

@Override
public void AppSchedulable.updateDemand() {
  // calculate demand
  demand = Resources.createResource(0);
  // Demand is current consumption plus outstanding requests
  Resources.addTo(demand, app.getCurrentConsumption());

  // Add up outstanding resource requests
  for (Priority p : app.getPriorities()) {
    for (ResourceRequest r : app.getResourceRequests(p).values()) {
      Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
      Resources.addTo(demand, total);
    }
  }
}

Likewise, regarding rootQueue.recomputeShares(), it is almost the same design pattern and logic as rootQueue.updateDemand() method.
@Override
public void FSParentQueue.recomputeShares() {
  // Compute FairShare for each child queue on the basis of FairShare of current queue.
  policy.computeShares(childQueues, getFairShare());
  for (FSQueue childQueue : childQueues) {
    childQueue.getMetrics().setFairShare(childQueue.getFairShare());
    // Invoke recomputeShares() method respectively on all child queues.
    childQueue.recomputeShares();
  }
}

@Override
public void FSLeafQueue.recomputeShares() {
  // Compute FairShare for each application on the basis of FairShare of current queue.
  policy.computeShares(getAppSchedulables(), getFairShare());
}

The SchedulingPolicy, whose purpose is to calculate a specific resource amount (For FairScheduler, it is FairShare), mentioned in above has three implementation class, namely, DominantResourceFairnessPolicy, FairSharePolicy, FifoPolicy.


Step 4: Preemption In Fair Scheduler


The entry is at 'FairScheduler.preemptTasksIfNecessary()'.
/**
 * Check for queues that need tasks preempted, either because they have been
 * below their guaranteed share for minSharePreemptionTimeout or they have
 * been below half their fair share for the fairSharePreemptionTimeout. If
 * such queues exist, compute how many tasks of each type need to be preempted
 * and then select the right ones using preemptTasks.
 */
protected synchronized void preemptTasksIfNecessary() {
  // Check whether preemption is on via parameter 'yarn.scheduler.fair.preemption' in yarn-site.xml
  if (!preemptionEnabled) {
    return;
  }

  long curTime = clock.getTime();
  // Check on the preemption interval
  if (curTime - lastPreemptCheckTime < preemptionInterval) {
    return;
  }
  lastPreemptCheckTime = curTime;

  Resource resToPreempt = Resources.none();

  for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
    // Sum up the amount of resource needed to preempt
    resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); 
  }
  if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, Resources.none())) {
    // Start to preempt
    preemptResources(queueMgr.getLeafQueues(), resToPreempt);
  }
}

In which, the resToPreempt() is the implementation method of calculating how much resource is needed to preempt.
/**
 * Return the resource amount that this queue is allowed to preempt, if any.
 * If the queue has been below its min share for at least its preemption
 * timeout, it should preempt the difference between its current share and
 * this min share. If it has been below half its fair share for at least the
 * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
 * full fair share. If both conditions hold, we preempt the max of the two
 * amounts (this shouldn't happen unless someone sets the timeouts to be
 * identical for some reason).
 */
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
  String queue = sched.getName();
  long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
  long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
  Resource resDueToMinShare = Resources.none();
  Resource resDueToFairShare = Resources.none();
  //Min Share Preemption
  //MAX(0, MIN(minShare, demand)-haveUsed)
  if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
    Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getMinShare(), sched.getDemand());
    resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
  }
  //Fair Share Preemption
  //MAX(0, MIN(fairShare, demand)-haveUsed)
  if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
    Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getFairShare(), sched.getDemand());
    resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
  //MAX(Min Share Preemption, Fair Share Preemption)
  Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, resDueToMinShare, resDueToFairShare);
 
  return resToPreempt;
}

The main logic of preemption is at 'FairScheduler.preemptResources()'.
/**
 * Preempt a quantity of resources from a list of QueueSchedulables. The
 * policy for this is to pick apps from queues that are over their fair share,
 * but make sure that no queue is placed below its fair share in the process.
 * We further prioritize preemption by choosing containers with lowest
 * priority to preempt.
 */
protected void preemptResources(Collection<FSLeafQueue> scheds, Resource toPreempt) {
  // Collect running containers from over-scheduled queues
  List<RMContainer> runningContainers = new ArrayList<RMContainer>();

  for (FSLeafQueue sched : scheds) {
    //Put all the applications whose occupied resources is above FairShare into to-preempt queue.
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) {
      for (AppSchedulable as : sched.getAppSchedulables()) {
        for (RMContainer c : as.getApp().getLiveContainers()) {
          runningContainers.add(c);
          apps.put(c, as.getApp());
          queues.put(c, sched);
        }
      }
    }
  }

  // Sort containers into reverse order of priority
  Collections.sort(runningContainers, new Comparator<RMContainer>() {
    public int compare(RMContainer c1, RMContainer c2) {
      int ret = c1.getContainer().getPriority().compareTo(c2.getContainer().getPriority());
      if (ret == 0) {
        return c2.getContainerId().compareTo(c1.getContainerId());
      }
      return ret;
    }
  });
 
  // Scan down the list of containers we've already warned and kill them
  // if we need to.  Remove any containers from the list that we don't need
  // or that are no longer running.
  Iterator<RMContainer> warnedIter = warnedContainers.iterator();
  Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
  while (warnedIter.hasNext()) {
    RMContainer container = warnedIter.next();
    if (container.getState() == RMContainerState.RUNNING &&
        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) {
      warnOrKillContainer(container, apps.get(container), queues.get(container));
      preemptedThisRound.add(container);
      Resources.subtractFrom(toPreempt, container.getContainer().getResource());
    } else {
      warnedIter.remove();
    }
  }

  // Scan down the rest of the containers until we've preempted enough, making
  // sure we don't preempt too many from any queue
  Iterator<RMContainer> runningIter = runningContainers.iterator();
  while (runningIter.hasNext() &&
    Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) {
    RMContainer container = runningIter.next();
    FSLeafQueue sched = queues.get(container);
    if (!preemptedThisRound.contains(container) &&
        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) {
      warnOrKillContainer(container, apps.get(container), sched);
     
      warnedContainers.add(container);
      Resources.subtractFrom(toPreempt, container.getContainer().getResource());
    }
  }
}

The final kill logic is in FairScheduler.warnOrKillContainer(). As stated in official document, preemption in Hadoop-2.2.0 is experimental. This can be seen in this method:
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, FSLeafQueue queue) {
  Long time = app.getContainerPreemptionTime(container);

  if (time != null) {
    // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
    // proceed with kill
    if (time + waitTimeBeforeKill < clock.getTime()) {
      ContainerStatus status =
        SchedulerUtils.createPreemptedContainerStatus(
          container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);

      // TODO: Not sure if this ever actually adds this to the list of cleanup
      // containers on the RMNode (see SchedulerNode.releaseContainer()).
      completedContainer(container, status, RMContainerEventType.KILL);
      LOG.info("Killing container" + container +
          " (after waiting for premption for " +
          (clock.getTime() - time) + "ms)");
    }
  } else {
    // track the request in the FSSchedulerApp itself
    app.addPreemption(container, clock.getTime());
  }
}

According to our experiment (Related post: Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption), the preemption is valid in Hadoop-2.2.0.

Lastly, here's a sequence diagram for all essential invocations at the scope of FairScheduler:



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

1 comment:

  1. Nice and good article. It is very useful for me to learn and understand easily. Thanks for sharing your valuable information and time. Please keep updating Big data online training

    ReplyDelete