model as follows:
Map Stage : map(k
1
, v
1
) ⇒ list(k
2
, v
2
)
Reduce Stage : reduce(k
2
, list(v
2
)) ⇒ list(v
3
)
In the map stage, map(k, v) reads hk, vi records one by one
from an input split, processes each record and output new
hk, vi records. In the reduce stage, the framework groups the
hk, vi records into hk, list(v)i by the key k, and then launches
reduce(k, list(v)) to process each hk, list(v)i group. Hadoop
natively supports this programming model, while Dryad and
Spark provide general and user-friendly operators, such as
map(), flatMap(), groupByKey(), reduceByKey(), coGroup(),
and join(), which are built on top of map() and reduce().
Users can also write applications using high-level languages
such as SQL-like Pig script [10], which can automatically
generate binary map() and reduce(). For optimization, users
can define a mini reduce() named combine(). We regard
combine() as reduce() since they usually share the same code
for aggregation.
Apart from map() and reduce(), users need to write a driver
program (shown in Fig. 2) to submit an application to Spark.
The driver program can also (1) generate and broadcast data to
each task; (2) collect the tasks’ outputs. So, in this paper, we
regard user code as map(), reduce(), and the driver program.
B. Dataflow
A distributed data-parallel application consists of one or
multiple MapReduce jobs. As shown in Fig. 1, a job will
go through a map stage and a reduce stage (a Dryad/Spark
job can go through multiple map and reduce stages connected
as a directed acyclic graph). Each stage contains multiple
map/reduce tasks (i.e., mappers/reducers). For parallelism, the
mappers’ outputs are partitioned and each partition is shuffled
to a corresponding reducer by the framework. Dataflow refers
to the data that flows among mappers and reducers.
The major difference between MapReduce and Dryad/Spark
is that Dryad/Spark supports pipeline. In the pipeline, map/re-
duce tasks can continuously execute multiple user-defined
functions (e.g., run another map() after a map()) without
storing the intermediate results (e.g., results of the first map())
into the disk. In Spark, users can also explicitly tell the
framework to cache reusable intermediate results in memory
(e.g., outputs of reduce() used for the next job) using cache().
C. Configurations
The application’s configurations consist of two parts: (1)
Memory-related configurations affect the memory usage di-
rectly. For example, memory limit defines the memory space
(heap size) of map/reduce tasks and buffer size defines the
size of framework buffers. (2) Dataflow-related configurations
affect the volume of data that flow among mappers and reduc-
ers. For instance, partition function defines how to partition the
hk, vi records outputted by map(), while the partition number
defines how many partitions will be generated and how many
reducers will be launched.
III. METHODOLOGY
A. Subjects
We took real-world data-parallel applications that run atop
Apache Hadoop and Apache Spark as our study subjects.
Since there are not any special bug repositories for OOM
errors (JIRA mainly covers the framework bugs), users usually
post their OOM errors on the open forums (e.g., StackOver-
flow.com and Hadoop/Spark mailing list). We totally found
1151 issues by searching keywords such as “Hadoop out of
memory” and “Spark OOM” in StackOverflow.com, Hadoop
mailing list [8], Spark user/dev mailing list [9], developers’
blogs, and two MapReduce books [14], [15]. We manually re-
viewed each issue and only selected the issues that satisfy: (1)
The issue is a Hadoop/Spark OOM error, since 786 issues are
not OOM errors (e.g., only contain partial keywords “Hadoop
Memory”). (2) The OOM error occurs in the Hadoop/Spark
applications, not other service components (e.g., the scheduler
and resource manager). In total, 276 OOM errors are selected.
These errors occur in diverse Hadoop/Spark applications, such
as raw MapReduce/Spark code, Apache Pig [10], Apache Hive
[11], Apache Mahout [16], Cloud
9
[17] (a Hadoop toolkit
for text processing), GraphX [18] and MLlib [12]. Based on
the approach in Section B, we identified the root causes of
123 OOM errors (listed in Table I). The root causes of the
other 153 OOM errors are unknown. Therefore, our study only
performs on these 123 OOM errors (a.k.a. failures).
B. Root cause and fix pattern identification
For each OOM error, we manually reviewed the user’s error
description and the professional answers given by experts (e.g.,
Hadoop/Spark committers from cloudera.com, experienced
developers from ebay.com, and book authors). Out of the
276 OOM errors, the root causes of 123 errors have been
identified in the following three scenarios: (1) The experts
identified the root causes and users have accepted the experts’
professional answers. (2) Users identified the root causes
themselves. They have explained the causes (e.g., abnormal
data, abnormal configurations, and abnormal code logic) in
their error descriptions and just asked how to fix the errors.
(3) We identified the causes by reproducing the errors in our
cluster and manually analyzing the root causes.
Similar to the root causes, we collected the fix patterns from
42 OOM errors, where the experts provided fix methods or
users reported the successful fix methods (25 errors). Then,
we merged the similar fix methods together and got 11 fix
patterns.
C. OOM error reproduction
To fully understand the root causes and fix patterns of OOM
errors, we have reproduced 43 OOM errors (35%), which
have detailed data characteristics, reproducible user code, and
OOM stack traces. Since we did not have the same dataset
as the users’, we used the public dataset (Wikipedia) and
synthetic dataset (random text and a well-known benchmark
[19]) instead. The experiments were conducted on a 11-
node cluster using Hadoop-1.2 and Spark-1.2. Each node has