like Torque [12], significantly degrade performance, be-
cause files in Hadoop are distributed across all nodes as
in GFS [19]. Grid schedulers like Condor [22] support lo-
cality constraints, but only at the level of geographic sites,
not of machines, because they run CPU-intensive appli-
cations rather than data-intensive workloads like MapRe-
duce. Even with a granular fair scheduler, we found that lo-
cality suffered in two situations: concurrent jobs and small
jobs. We address this problem through a technique called
delay scheduling that can double throughput.
The second aspect of MapReduce that causes problems
is the dependence between map and reduce tasks: Re-
duce tasks cannot finish until all the map tasks in their
job are done. This interdependence, not present in tradi-
tional cluster scheduling models, can lead to underutiliza-
tion and starvation: a long-running job that acquires reduce
slots on many machines will not release them until its map
phase finishes, starving other jobs while underutilizing the
reserved slots. We propose a simple technique called copy-
compute splitting to address this problem, leading in 2-10x
gains in throughput and response time. The reduce/map de-
pendence also creates other dynamics not present in other
settings: for example, even with well-behaved jobs, fair
sharing in MapReduce can take longer to finish a batch
of jobs than FIFO; this is not true environments, such as
packet scheduling, where fair sharing is work conserving.
Yet another issue is that intermediate results produced by
the map phase cannot be deleted until the job ends, con-
suming disk space. We explore these issues in Section 7.
Although we motivate our work with the Facebook case
study, the problems we address are by no means con-
strained to a data warehousing workload. Our contacts at
another major web company using Hadoop confirm that
the biggest complaint users have about the research clus-
ters there is long queueing delays. Our work is also rel-
evant to the several academic Hadoop clusters that have
been announced. One such cluster is already using our fair
scheduler on 2000 nodes. In general, effective scheduling
is more important in data-intensive cluster computing than
in other settings because the resource being shared (a clus-
ter) is very expensive and because data is hard to move (so
data consolidation provides significant value).
The rest of this paper is organized as follows. Section 2
provides background on Hadoop and problems with previ-
ous scheduling solutions for Hadoop, including a Torque-
based scheduler. Section 3 presents our fair scheduler.
Section 4 describes data locality problems and our delay
scheduling technique to address them. Section 5 describes
problems caused by reduce/map interdependence and our
copy-compute splitting technique to mitigate them. We
evaluate our algorithms in Section 6. Section 7 discusses
scheduling tradeoffs in MapReduce and general lessons for
job scheduling in cluster programming systems. We survey
related work in Section 8 and conclude in Section 9.
Figure 1: Data flow in MapReduce. Figure from [4].
2 Background
Hadoop’s implementation of MapReduce resembles that of
Google [16]. Hadoop runs on top of a distributed file sys-
tem, HDFS, which stores three replicas of each block like
GFS [19]. Users submit jobs consisting of a map function
and a reduce function. Hadoop breaks each job into mul-
tiple tasks. First, map tasks process each block of input
(typically 64 MB) and produce intermediate results, which
are key-value pairs. These are saved to disk. Next, re-
duce tasks fetch the list of intermediate results associated
with each key and run it through the user’s reduce func-
tion, which produces output. Each reducer is responsible
for a different portion of the key space. Figure 1 illustrates
a MapReduce computation.
Job scheduling in Hadoop is performed by a master
node, which distributes work to a number of slaves. Tasks
are assigned in response to heartbeats (status messages) re-
ceived from the slaves every few seconds. Each slave has a
fixed number of map slots and reduce slots for tasks. Typi-
cally, Hadoop tasks are single-threaded, so there is one slot
per core. Although the slot model can sometimes under-
utilize resources (e.g., when there are no reduces to run), it
makes managing memory and CPU on the slaves easy. For
example, reduces tend to use more memory than maps, so it
is useful to limit their number. Hadoop is moving towards
more dynamic load management for slaves, such as taking
into account tasks’ memory requirements [5]. In this pa-
per, we focus on scheduling problems above the slave level.
The issues we identify and the techniques we develop are
independent of slave load management mechanism.
2.1 Previous Scheduling Solutions for Hadoop
Hadoop’s built-in scheduler runs jobs in FIFO order, with
five priority levels. When a task slot becomes free, the
scheduler scans through jobs in order of priority and submit
time to find one with a task of the required type
1
. For maps,
the scheduler uses a locality optimization as in Google’s
1
With memory-aware load management [5], there is also a check that
the slave has enough memory for the task.
2