workers executing reduce tasks are notified of the re-
execution. Any reduce task that has not already read the
data from worker A will read the data from worker B.
MapReduce is resilient to large-scale worker failures.
For example, during one MapReduce operation, network
maintenance on a running cluster was causing groups of
80 machines at a time to become unreachable for sev-
eral minutes. The MapReduce master simply re-executed
the work done by the unreachable worker machines, and
continued to make forward progress, eventually complet-
ing the MapReduce operation.
Master Failure
It is easy to make the master write periodic checkpoints
of the master data structures described above. If the mas-
ter task dies, a new copy can be started from the last
checkpointed state. However, given that there is only a
single master, its failure is unlikely; therefore our cur-
rent implementation aborts the MapReduce computation
if the master fails. Clients can check for this condition
and retry the MapReduce operation if they desire.
Semantics in the Presence of Failures
When the user-supplied map and reduce operators are de-
terministic functions of their input values, our distributed
implementation produces the same output as would have
been produced by a non-faulting sequential execution of
the entire program.
We rely on atomic commits of map and reduce task
outputs to achieve this property. Each in-progress task
writes its output to private temporary files. A reduce task
produces one such file, and a map task produces R such
files (one per reduce task). When a map task completes,
the worker sends a message to the master and includes
the names of the R temporary files in the message. If
the master receives a completion message for an already
completed map task, it ignores the message. Otherwise,
it records the names of R files in a master data structure.
When a reduce task completes, the reduce worker
atomically renames its temporary output file to the final
output file. If the same reduce task is executed on multi-
ple machines, multiple rename calls will be executed for
the same final output file. We rely on the atomic rename
operation provided by the underlying file system to guar-
antee that the final file system state contains just the data
produced by one execution of the reduce task.
The vast majority of our map and reduce operators are
deterministic, and the fact that our semantics are equiv-
alent to a sequential execution in this case makes it very
easy for programmers to reason about their program’s be-
havior. When the map and/or reduce operators are non-
deterministic, we provide weaker but still reasonable se-
mantics. In the presence of non-deterministic operators,
the output of a particular reduce task R
1
is equivalent to
the output for R
1
produced by a sequential execution of
the non-deterministic program. However, the output for
a different reduce task R
2
may correspond to the output
for R
2
produced by a different sequential execution of
the non-deterministic program.
Consider map task M and reduce tasks R
1
and R
2
.
Let e(R
i
) be the execution of R
i
that committed (there
is exactly one such execution). The weaker semantics
arise because e(R
1
) may have read the output produced
by one execution of M and e(R
2
) may have read the
output produced by a different execution of M.
3.4 Locality
Network bandwidth is a relatively scarce resource in our
computing environment. We conserve network band-
width by taking advantage of the fact that the input data
(managed by GFS [8]) is stored on the local disks of the
machines that make up our cluster. GFS divides each
file into 64 MB blocks, and stores several copies of each
block (typically 3 copies) on different machines. The
MapReduce master takes the location information of the
input files into account and attempts to schedule a map
task on a machine that contains a replica of the corre-
sponding input data. Failing that, it attempts to schedule
a map task near a replica of that task’s input data (e.g., on
a worker machine that is on the same network switch as
the machine containing the data). When running large
MapReduce operations on a significant fraction of the
workers in a cluster, most input data is read locally and
consumes no network bandwidth.
3.5 Task Granularity
We subdivide the map phase into M pieces and the re-
duce phase into R pieces, as described above. Ideally, M
and R should be much larger than the number of worker
machines. Having each worker perform many different
tasks improves dynamic load balancing, and also speeds
up recovery when a worker fails: the many map tasks
it has completed can be spread out across all the other
worker machines.
There are practical bounds on how large M and R can
be in our implementation, since the master must make
O(M + R) scheduling decisions and keeps O(M ∗ R)
state in memory as described above. (The constant fac-
tors for memory usage are small however: the O(M ∗ R)
piece of the state consists of approximately one byte of
data per map task/reduce task pair.)
To appear in OSDI 2004 5