computation, and abstract away from the lower-level “physical”
details of the different kinds of input and output storage formats
and the appropriate partitioning of the logical computation into a
graph of MapReduces.
3.1 Core Abstractions
The central class of the FlumeJava library is PCollection<T>,
a (possibly huge) immutable bag of elements of type T. A
PCollection can either have a well-defined order (called a se-
quence), or the elements can be unordered (called a collection).
Because they are less constrained, collections are more efficient
to generate and process than sequences. A PCollection<T>
can be created from an in-memory Java Collection<T>. A
PCollection<T> can also be created by reading a file in one of
several possible formats. For example, a text file can be read as a
PCollection<String>, and a binary record-oriented file can be
read as a PCollection<T>, given a specification of how to decode
each binary record into a Java object of type T. Data sets repre-
sented by multiple file shards can be read in as a single logical
PCollection. For example:
1
PCollection<String> lines =
readTextFileCollection("/gfs/data/shakes/hamlet.txt");
PCollection<DocInfo> docInfos =
readRecordFileCollection("/gfs/webdocinfo/part-*",
recordsOf(DocInfo.class));
In this code, recordsOf(...) specifies a particular way in which
a DocInfo instance is encoded as a binary record. Other pre-
defined encoding specifiers are strings() for UTF-8-encoded
text, ints() for a variable-length encoding of 32-bit integers, and
pairsOf(e1,e2 ) for an encoding of pairs derived from the en-
codings of the components. Users can specify their own custom
encodings.
A second core class is PTable<K,V>, which represents
a (possibly huge) immutable multi-map with keys of type
K and values of type V. PTable<K,V> is a subclass of
PCollection<Pair<K,V>>, and indeed is just an unordered bag
of pairs. Some FlumeJava operations apply only to PCollections
of pairs, and in Java we choose to define a subclass to capture this
abstraction; in another language, PTable<K,V> might better be de-
fined as a type synonym of PCollection<Pair<K,V>>.
The main way to manipulate a PCollection is to invoke a
data-parallel operation on it. The FlumeJava library defines only
a few primitive data-parallel operations; other operations are im-
plemented in terms of these primitives. The core data-parallel
primitive is parallelDo(), which supports elementwise compu-
tation over an input PCollection<T> to produce a new output
PCollection<S>. This operation takes as its main argument a
DoFn<T, S>, a function-like object defining how to map each
value in the input PCollection<T> into zero or more values to
appear in the output PCollection<S>. It also takes an indication
of the kind of PCollection or PTable to produce as a result. For
example:
PCollection<String> words =
lines.parallelDo(new DoFn<String,String>() {
void process(String line, EmitFn<String> emitFn) {
for (String word : splitIntoWords(line)) {
emitFn.emit(word);
}
}
}, collectionOf(strings()));
In this code, collectionOf(strings()) specifies that
the parallelDo() operation should produce an unordered
PCollection whose String elements should be encoded using
UTF-8. Other options include sequenceOf(elemEncoding )
1
Some of these examples have been simplified in minor ways from the real
versions, for clarity and compactness.
for ordered PCollections and tableOf(keyEncoding,
valueEncoding ) for PTables. emitFn is a call-back function
FlumeJava passes to the user’s process(...) method, which
should invoke emitFn.emit(outElem ) for each outElem that
should be added to the output PCollection. FlumeJava includes
subclasses of DoFn, e.g., MapFn and FilterFn, that provide
simpler interfaces in special cases. There is also a version of
parallelDo() that allows multiple output PCollections to
be produced simultaneously from a single traversal of the input
PCollection.
parallelDo() can be used to express both the map and reduce
parts of MapReduce. Since they will potentially be distributed
remotely and run in parallel, DoFn functions should not access
any global mutable state of the enclosing Java program. Ideally,
they should be pure functions of their inputs. It is also legal for
DoFn objects to maintain local instance variable state, but users
should be aware that there may be multiple DoFn replicas operating
concurrently with no shared state. These restrictions are shared by
MapReduce as well.
A second primitive, groupByKey(), converts a multi-map of
type PTable<K,V> (which can have many key/value pairs with the
same key) into a uni-map of type PTable<K, Collection<V>>
where each key maps to an unordered, plain Java Collection of
all the values with that key. For example, the following computes
a table mapping URLs to the collection of documents that link to
them:
PTable<URL,DocInfo> backlinks =
docInfos.parallelDo(new DoFn<DocInfo,
Pair<URL,DocInfo>>() {
void process(DocInfo docInfo,
EmitFn<Pair<URL,DocInfo>> emitFn) {
for (URL targetUrl : docInfo.getLinks()) {
emitFn.emit(Pair.of(targetUrl, docInfo));
}
}
}, tableOf(recordsOf(URL.class),
recordsOf(DocInfo.class)));
PTable<URL,Collection<DocInfo>> referringDocInfos =
backlinks.groupByKey();
groupByKey() captures the essence of the shuffle step of MapRe-
duce. There is also a variant that allows specifying a sorting order
for the collection of values for each key.
A third primitive, combineValues(), takes an input
PTable<K, Collection<V>> and an associative combining
function on Vs, and returns a PTable<K, V> where each input
collection of values has been combined into a single output value.
For example:
PTable<String,Integer> wordsWithOnes =
words.parallelDo(
new DoFn<String, Pair<String,Integer>>() {
void process(String word,
EmitFn<Pair<String,Integer>> emitFn) {
emitFn.emit(Pair.of(word, 1));
}
}, tableOf(strings(), ints()));
PTable<String,Collection<Integer>>
groupedWordsWithOnes = wordsWithOnes.groupByKey();
PTable<String,Integer> wordCounts =
groupedWordsWithOnes.combineValues(SUM_INTS);
combineValues() is semantically just a special case of
parallelDo(), but the associativity of the combining function al-
lows it to be implemented via a combination of a MapReduce com-
biner (which runs as part of each mapper) and a MapReduce re-
ducer (to finish the combining), which is more efficient than doing
all the combining in the reducer.
A fourth primitive, flatten(), takes a list of
PCollection<T>s and returns a single PCollection<T> that