Tuesday, November 4, 2014

Fair Scheduler In YARN, Hadoop-2.2.0 - Overall Introduction

Series:


Part 1. Introduction


Fair Scheduler is one of the schedulers in ResourceManager of YARN. Sited from the official document, fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an equal share of resources over time. When there is a single job running, that job uses the entire cluster. When other jobs are submitted, tasks slots that free up are assigned to the new jobs, so that each job gets roughly the same amount of CPU time. Unlike the default Hadoop scheduler (Fifo Scheduler), which forms a queue of jobs, this lets short jobs finish in reasonable time while not starving long jobs. It is also an easy way to share a cluster between multiple of users.

Part 2. Configuration


To apply Fair Scheduler, we can simply add the following to yarn-site.xml:
<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

After switching to Fair Scheduler, we can apply some auxiliary parameters located in yarn-site.xml. They are well explained in the official document:

yarn.scheduler.fair.allocation.file
Path to allocation file. An allocation file is an XML manifest describing queues and their properties, in addition to certain policy defaults. This file must be in the XML format described in the next section. If a relative path is given, the file is searched for on the classpath (which typically includes the Hadoop conf directory). Defaults to fair-scheduler.xml.

yarn.scheduler.fair.user-as-default-queue
Whether to use the username associated with the allocation as the default queue name, in the event that a queue name is not specified. If this is set to "false" or unset, all jobs have a shared default queue, named "default". Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored.

yarn.scheduler.fair.preemption
Whether to use preemption. Note that preemption is experimental in the current version. Defaults to false.
Source code of preemption is analyzed in Fair Scheduler In YARN, Hadoop-2.2.0 - Deep Into Code, and an experiment on it is shown in Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption.

yarn.scheduler.fair.sizebasedweight
Whether to assign shares to individual apps based on their size, rather than providing an equal share to all apps regardless of size. When set to true, apps are weighted by the natural logarithm of one plus the app's total requested memory, divided by the natural logarithm of 2. Defaults to false.

yarn.scheduler.fair.assignmultiple
Whether to allow multiple container assignments in one heartbeat. Defaults to false.

yarn.scheduler.fair.max.assign
If assignmultiple is true, the maximum amount of containers that can be assigned in one heartbeat. Defaults to -1, which sets no limit.

yarn.scheduler.fair.locality.threshold.node
For applications that request containers on particular nodes, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another node. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities.

yarn.scheduler.fair.locality.threshold.rack
For applications that request containers on particular racks, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another rack. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities.

yarn.scheduler.fair.allow-undeclared-pools
If this is true, new queues can be created at application submission time, whether because they are specified as the application's queue by the submitter or because they are placed there by the user-as-default-queue property. If this is false, any time an app would be placed in a queue that is not specified in the allocations file, it is placed in the "default" queue instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored.

Moreover, regarding 'Allocation File', which should be in XML format, specified by 'yarn.scheduler.fair.allocation.file', the relative parameters are as follows:

Queue elements
Which represent queues. Each may contain the following properties:

minResources: minimum resources the queue is entitled to, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. If a queue's minimum share is not satisfied, it will be offered available resources before any other queue under the same parent. Under the single-resource fairness policy, a queue is considered unsatisfied if its memory usage is below its minimum memory share. Under dominant resource fairness, a queue is considered unsatisfied if its usage for its dominant resource with respect to the cluster capacity is below its minimum share for that resource. If multiple queues are unsatisfied in this situation, resources go to the queue with the smallest ratio between relevant resource usage and minimum. Note that it is possible that a queue that is below its minimum may not immediately get up to its minimum when it submits an application, because already-running jobs may be using those resources.

maxResources: maximum resources a queue is allowed, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. A queue will never be assigned a container that would put its aggregate usage over this limit.

maxRunningApps: limit the number of apps from the queue to run at once

weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight.

schedulingPolicy: to set the scheduling policy of any queue. The allowed values are "fifo"/"fair"/"drf" or any class that extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. Defaults to "fair". If "fifo", apps with earlier submit times are given preference for containers, but apps submitted later may run concurrently if there is leftover space on the cluster after satisfying the earlier app's requests.
This means how applications within this queue will be scheduled. Fair scheduling policy is always applied between queues.

aclSubmitApps: a list of users and/or groups that can submit apps to the queue. Refer to the ACLs section below for more info on the format of this list and how queue ACLs work.

aclAdministerApps: a list of users and/or groups that can administer a queue. Currently the only administrative action is killing an application. Refer to the ACLs section below for more info on the format of this list and how queue ACLs work.

minSharePreemptionTimeout: number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues.

User elements
which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user.

A userMaxAppsDefault element
which sets the default running app limit for any users whose limit is not otherwise specified.

A fairSharePreemptionTimeout element
number of seconds a queue is under its fair share before it will try to preempt containers to take resources from other queues.
This parameter is valid provided 'yarn.scheduler.fair.preemption' in yarn-site.xml is set to true.

A defaultMinSharePreemptionTimeout element
which sets the default number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues; overriden by minSharePreemptionTimeout element in each queue if specified.

A queueMaxAppsDefault element
which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.

A defaultQueueSchedulingPolicy element
which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair".

A queuePlacementPolicy element
which contains a list of rule elements that tell the scheduler how to place incoming apps into queues. Rules are applied in the order that they are listed. Rules may take arguments. All rules accept the "create" argument, which indicates whether the rule can create a new queue. "Create" defaults to true; if set to false and the rule would place the app in a queue that is not configured in the allocations file, we continue on to the next rule. The last rule must be one that can never issue a continue. Valid rules are:

specified: the app is placed into the queue it requested. If the app requested no queue, i.e. it specified "default", we continue.

user: the app is placed into a queue with the name of the user who submitted it.

primaryGroup: the app is placed into a queue with the name of the primary group of the user who submitted it.

secondaryGroupExistingQueue: the app is placed into a queue with a name that matches a secondary group of the user who submitted it. The first secondary group that matches a configured queue will be selected.

default: the app is placed into the queue named "default".

reject: the app is rejected.

An example of Allocation File is as follows:
<?xml version="1.0"?>
<allocations>
  <queue name="sample_queue">
    <minResources>10000 mb,0vcores</minResources>
    <maxResources>90000 mb,0vcores</maxResources>
    <maxRunningApps>50</maxRunningApps>
    <weight>2.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
    <queue name="sample_sub_queue">
      <aclSubmitApps>charlie</aclSubmitApps>
      <minResources>5000 mb,0vcores</minResources>
    </queue>
  </queue>
 
  <user name="sample_user">
    <maxRunningApps>30</maxRunningApps>
  </user>
  <userMaxAppsDefault>5</userMaxAppsDefault>
 
  <queuePlacementPolicy>
    <rule name="specified" />
    <rule name="primaryGroup" create="false" />
    <rule name="default" />
  </queuePlacementPolicy>
</allocations>

All the parameters are well-explaining as above. When changing parameters while YARN is running, if they are in yarn-site.xml, we need to restart YARN to make it come into force, whereas if in Allocation File (fair-scheduler.xml), they will be reloaded periodically.

Part 3. Fair Scheduler Algorithm

There are 4 indices in Fair Scheduler, namely, MinResources, MaxResources, Demand and FairShare. The first two are configured in Allocation File (fair-scheduler.xml), the rest are calculated on the basis of resource condition at that time.

The way to calculate 'demand' is quite simple and easy, according to the implementation code at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable.updateDemand()', demand is current consumption plus outstanding requests.

However, calculation on FairShare is a little bit intricate, the formula is as follows:


'd' is for demand, 'w' is for weight, 'm' is for MinResources, 't' is for total resources.

There are two conditions to analyze, let's assume weight is always 1 so as to simplify the issue:

If demand is below MinResources, then FairShare will be equal to current demand. Although MinResources doesn't meet, there's need to preempt as much as MinResources to accomplish the application, that sounds reasonable.

If demand is above MinResources, we will get at least MinResources for our applications to run. That is to say, we should always calculate MinResources quantitatively for every queue, for it acts as the lower bound for all applications running in each queue.

What Fair Scheduler intend to achieve is to find a proper r, which will make the equation true, that is to say, find FairShare for every application so as to consume all the resources in cluster when summing up. The implementation code is at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.computeShares()', whose comment is sited as below:
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares to
* the Schedulables that respects their min and max shares.
* 
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of min and max shares. If there
* were no minimum or maximum shares, then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum and maximum shares add a
* further twist - Some Schedulables may have a min share higher than their
* assigned share or a max share lower than their assigned share.
* 
* To deal with these possibilities, we define an assignment of slots as being
* fair if there exists a ratio R such that: Schedulables S where S.minShare
* > R * S.weight are given share S.minShare - Schedulables S where S.maxShare
* < R * S.weight are given S.maxShare - All other Schedulables S are
* assigned share R * S.weight - The sum of all the shares is totalSlots.
* 
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
* 
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number of
* slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
* 
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* use more than totalResources resources). The helper method
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
* given value of R.
* 
* The running time of this algorithm is linear in the number of Schedulables,
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/

When there are plenty of available resources in cluster, a newborn application will take up as much resources as possible so long as it is below its MaxResources. However, while other applications come in, Fair Scheduler will calculate FairShare for every applications and queues.

If preemption is off, the released resources in the future by current running applications will be scheduled to the applications whose occupied resources is below its FairShare preferentially. Whereas if preemption is on, some resourced will be preempted from current running applications whose occupied resources is above its FairShare. In either way, Fair Scheduler will schedule resources among queues and among applications 'fairly' to the greatest extent.

The amount to preempt is calculated by the following formula:


As we can see, the amount that will be preempted is the max between MinShare Preemption and FairShare Preemption. The corresponding implementation code is at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.resToPreempt()'.




Reference:


© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

3 comments:

  1. tried to configure cluster using the recommendations mentioned here.. seen a strange behavior:

    a) configured cluster to use fair scheduler
    b) allocated 36 GB to yarn out of 64 GB available on data nods
    c) ran MR job but the job always uses 50% of the resources i.e. memory.
    d) I see only 18 GB getting utilized by mappers and rest 18 is still free
    e) what I can do to make my MR job use entire configured memory?

    d) how do I tell yarn to use entire 36 GB of configured memory?

    ReplyDelete
  2. This blog is gives great information on big data hadoop online training in hyderabad, uk, usa, canada.

    best online hadoop training in hyderabad.
    hadoop online training in usa, uk, canada.

    ReplyDelete
  3. Nice and good article. It is very useful for me to learn and understand easily. Thanks for sharing your valuable information and time. Please keep updating hadoop online training

    ReplyDelete