Series:
In this post, we'll look through the source code of Fair Scheduler, catch the key point and leave an impression of the procedure on how Fair Scheduler works.
Firstly, YARN will check at parameter 'yarn.resourcemanager.scheduler.class' to decide which scheduler will be applied.
The relative code is at 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.createScheduler()'
After creating FairScheduler, the FairScheduler.reinitialize() method will be invoked from 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceInit()'.
For FairScheduler.reinitialize():
UpdateThread is a daemon thread, which will be invoked periodically.
Go deep into FairScheduler.update(), several things have done here, namely, reload Allocation File (fair-scheduler.xml), calculate the demand of queues as well as applications in each queue recursively, and calculate FairShare for each queue recursively.
QueueManager's function is to load and maintain all information on queues, which is configured in Allocation File (fair-scheduler.xml.
QueueManager.initialize() is as follows, in which, reloadAllocs() will invoke on QueueManager.loadQueue() recursively (support for hierarchical queue) in order to load all the queues.
The leaf queue, non-leaf queue (which has one or more queues as child), and application is abstracted as FSLeafQueue, FSParentQueue, AppSchedulable, respectively, which all extend from 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable'. Obviously, this is composite design pattern.
For the above rootQueue.updateDemand() method, we can see from the code that updateDemand() is called from FSParentQueue, then FSLeafQueue, and finally down to Applications.
Likewise, regarding rootQueue.recomputeShares(), it is almost the same design pattern and logic as rootQueue.updateDemand() method.
The SchedulingPolicy, whose purpose is to calculate a specific resource amount (For FairScheduler, it is FairShare), mentioned in above has three implementation class, namely, DominantResourceFairnessPolicy, FairSharePolicy, FifoPolicy.
The entry is at 'FairScheduler.preemptTasksIfNecessary()'.
In which, the resToPreempt() is the implementation method of calculating how much resource is needed to preempt.
The main logic of preemption is at 'FairScheduler.preemptResources()'.
The final kill logic is in FairScheduler.warnOrKillContainer(). As stated in official document, preemption in Hadoop-2.2.0 is experimental. This can be seen in this method:
According to our experiment (Related post: Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption), the preemption is valid in Hadoop-2.2.0.
Lastly, here's a sequence diagram for all essential invocations at the scope of FairScheduler:
© 2014-2017 jason4zhu.blogspot.com All Rights Reserved
If transfering, please annotate the origin: Jason4Zhu
- Fair Scheduler In YARN, Hadoop-2.2.0 - Overall Introduction
- Fair Scheduler In YARN, Hadoop-2.2.0 - Deep Into Code
- Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption
In this post, we'll look through the source code of Fair Scheduler, catch the key point and leave an impression of the procedure on how Fair Scheduler works.
Step 1: Create FairScheduler
Firstly, YARN will check at parameter 'yarn.resourcemanager.scheduler.class' to decide which scheduler will be applied.
<property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> </property>
The relative code is at 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.createScheduler()'
protected ResourceScheduler createScheduler() { //Read paramter 'yarn.resourcemanager.scheduler.class' to get specific scheduler class name. String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, YarnConfiguration.DEFAULT_RM_SCHEDULER); Class<?> schedulerClazz = Class.forName(schedulerClassName); return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, this.conf); }
After creating FairScheduler, the FairScheduler.reinitialize() method will be invoked from 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceInit()'.
try { this.scheduler.reinitialize(conf, this.rmContext); } catch (IOException ioe) { throw new RuntimeException("Failed to initialize scheduler", ioe); }
Step 2: Main Logic Of FairScheduler
For FairScheduler.reinitialize():
@Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { //Initialize parameters relative to FairScheduler in yarn-site.xml userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); preemptionEnabled = this.conf.getPreemptionEnabled(); preemptionInterval = this.conf.getPreemptionInterval(); waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); …… if (!initialized) { //Initialize QueueManager queueMgr.initialize(); //Initialize UpdateThread Thread updateThread = new Thread(new UpdateThread()); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); updateThread.start(); } else { //Reload Allocation file (fair-scheduler.xml) queueMgr.reloadAllocs(); } }
/** * A runnable which calls {@link FairScheduler#update()} every * <code>UPDATE_INTERVAL</code> milliseconds. */ private class UpdateThread implements Runnable { public void run() { while (true) { try { Thread.sleep(UPDATE_INTERVAL); //Call FairScheduler.update() update(); //The entry for preemtion preemptTasksIfNecessary(); } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } } } }
Go deep into FairScheduler.update(), several things have done here, namely, reload Allocation File (fair-scheduler.xml), calculate the demand of queues as well as applications in each queue recursively, and calculate FairShare for each queue recursively.
/** * Recompute the internal variables used by the scheduler - per-job weights, * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. */ protected synchronized void update() { queueMgr.reloadAllocsIfNecessary(); // Reload allocation file updateRunnability(); // Set job runnability based on user/queue limits updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); // Recursively update demands for all queues rootQueue.updateDemand(); rootQueue.setFairShare(clusterCapacity); // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); }
Step 3: Main Logic Of QueueManager
QueueManager.initialize() is as follows, in which, reloadAllocs() will invoke on QueueManager.loadQueue() recursively (support for hierarchical queue) in order to load all the queues.
public void initialize() throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { FairSchedulerConfiguration conf = scheduler.getConf(); // Create queue 'root' rootQueue = new FSParentQueue("root", this, scheduler, null); queues.put(rootQueue.getName(), rootQueue); this.allocFile = conf.getAllocationFile(); // Load fair-scheduler.xml reloadAllocs(); lastSuccessfulReload = scheduler.getClock().getTime(); lastReloadAttempt = scheduler.getClock().getTime(); // Create the default queue getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); }
The leaf queue, non-leaf queue (which has one or more queues as child), and application is abstracted as FSLeafQueue, FSParentQueue, AppSchedulable, respectively, which all extend from 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable'. Obviously, this is composite design pattern.
For the above rootQueue.updateDemand() method, we can see from the code that updateDemand() is called from FSParentQueue, then FSLeafQueue, and finally down to Applications.
@Override public void FSParentQueue.updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources Resource maxRes = queueMgr.getMaxResources(getName()); demand = Resources.createResource(0); // Iterate through child queues, add whose demand as current queue's demand. for (FSQueue childQueue : childQueues) { childQueue.updateDemand(); Resource toAdd = childQueue.getDemand(); demand = Resources.add(demand, toAdd); demand = Resources.componentwiseMin(demand, maxRes); if (Resources.equals(demand, maxRes)) { break; } } } @Override public void FSLeafQueue.updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources Resource maxRes = queueMgr.getMaxResources(getName()); demand = Resources.createResource(0); // Iterate through applications, add whose demand as current queue's demand. for (AppSchedulable sched : appScheds) { sched.updateDemand(); Resource toAdd = sched.getDemand(); demand = Resources.add(demand, toAdd); demand = Resources.componentwiseMin(demand, maxRes); if (Resources.equals(demand, maxRes)) { break; } } } @Override public void AppSchedulable.updateDemand() { // calculate demand demand = Resources.createResource(0); // Demand is current consumption plus outstanding requests Resources.addTo(demand, app.getCurrentConsumption()); // Add up outstanding resource requests for (Priority p : app.getPriorities()) { for (ResourceRequest r : app.getResourceRequests(p).values()) { Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); Resources.addTo(demand, total); } } }
Likewise, regarding rootQueue.recomputeShares(), it is almost the same design pattern and logic as rootQueue.updateDemand() method.
@Override public void FSParentQueue.recomputeShares() { // Compute FairShare for each child queue on the basis of FairShare of current queue. policy.computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setFairShare(childQueue.getFairShare()); // Invoke recomputeShares() method respectively on all child queues. childQueue.recomputeShares(); } } @Override public void FSLeafQueue.recomputeShares() { // Compute FairShare for each application on the basis of FairShare of current queue. policy.computeShares(getAppSchedulables(), getFairShare()); }
The SchedulingPolicy, whose purpose is to calculate a specific resource amount (For FairScheduler, it is FairShare), mentioned in above has three implementation class, namely, DominantResourceFairnessPolicy, FairSharePolicy, FifoPolicy.
Step 4: Preemption In Fair Scheduler
The entry is at 'FairScheduler.preemptTasksIfNecessary()'.
/** * Check for queues that need tasks preempted, either because they have been * below their guaranteed share for minSharePreemptionTimeout or they have * been below half their fair share for the fairSharePreemptionTimeout. If * such queues exist, compute how many tasks of each type need to be preempted * and then select the right ones using preemptTasks. */ protected synchronized void preemptTasksIfNecessary() { // Check whether preemption is on via parameter 'yarn.scheduler.fair.preemption' in yarn-site.xml if (!preemptionEnabled) { return; } long curTime = clock.getTime(); // Check on the preemption interval if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.none(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { // Sum up the amount of resource needed to preempt resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, Resources.none())) { // Start to preempt preemptResources(queueMgr.getLeafQueues(), resToPreempt); } }
In which, the resToPreempt() is the implementation method of calculating how much resource is needed to preempt.
/** * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and * this min share. If it has been below half its fair share for at least the * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its * full fair share. If both conditions hold, we preempt the max of the two * amounts (this shouldn't happen unless someone sets the timeouts to be * identical for some reason). */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { String queue = sched.getName(); long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue); long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); //Min Share Preemption //MAX(0, MIN(minShare, demand)-haveUsed) if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getMinShare(), sched.getDemand()); resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } //Fair Share Preemption //MAX(0, MIN(fairShare, demand)-haveUsed) if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } //MAX(Min Share Preemption, Fair Share Preemption) Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, resDueToMinShare, resDueToFairShare); return resToPreempt; }
The main logic of preemption is at 'FairScheduler.preemptResources()'.
/** * Preempt a quantity of resources from a list of QueueSchedulables. The * policy for this is to pick apps from queues that are over their fair share, * but make sure that no queue is placed below its fair share in the process. * We further prioritize preemption by choosing containers with lowest * priority to preempt. */ protected void preemptResources(Collection<FSLeafQueue> scheds, Resource toPreempt) { // Collect running containers from over-scheduled queues List<RMContainer> runningContainers = new ArrayList<RMContainer>(); for (FSLeafQueue sched : scheds) { //Put all the applications whose occupied resources is above FairShare into to-preempt queue. if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) { for (AppSchedulable as : sched.getAppSchedulables()) { for (RMContainer c : as.getApp().getLiveContainers()) { runningContainers.add(c); apps.put(c, as.getApp()); queues.put(c, sched); } } } } // Sort containers into reverse order of priority Collections.sort(runningContainers, new Comparator<RMContainer>() { public int compare(RMContainer c1, RMContainer c2) { int ret = c1.getContainer().getPriority().compareTo(c2.getContainer().getPriority()); if (ret == 0) { return c2.getContainerId().compareTo(c1.getContainerId()); } return ret; } }); // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. Iterator<RMContainer> warnedIter = warnedContainers.iterator(); Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); if (container.getState() == RMContainerState.RUNNING && Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) { warnOrKillContainer(container, apps.get(container), queues.get(container)); preemptedThisRound.add(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } // Scan down the rest of the containers until we've preempted enough, making // sure we don't preempt too many from any queue Iterator<RMContainer> runningIter = runningContainers.iterator(); while (runningIter.hasNext() && Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) { RMContainer container = runningIter.next(); FSLeafQueue sched = queues.get(container); if (!preemptedThisRound.contains(container) && Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) { warnOrKillContainer(container, apps.get(container), sched); warnedContainers.add(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } } }
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, FSLeafQueue queue) {
Long time = app.getContainerPreemptionTime(container);
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
if (time + waitTimeBeforeKill < clock.getTime()) {
ContainerStatus status =
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
LOG.info("Killing container" + container +
" (after waiting for premption for " +
(clock.getTime() - time) + "ms)");
}
} else {
// track the request in the FSSchedulerApp itself
app.addPreemption(container, clock.getTime());
}
}
According to our experiment (Related post: Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption), the preemption is valid in Hadoop-2.2.0.
Lastly, here's a sequence diagram for all essential invocations at the scope of FairScheduler:
© 2014-2017 jason4zhu.blogspot.com All Rights Reserved
If transfering, please annotate the origin: Jason4Zhu
Nice and good article. It is very useful for me to learn and understand easily. Thanks for sharing your valuable information and time. Please keep updating Big data online training
ReplyDelete