It’s been some years now since Google wrote the paper [“MapReduce: Simplified Data Processing on Large Clusters“] in 2004. In this paper Google presented MapReduce, a programming model and associated implementation for solving parallel computation problems with big-scale data. This model is based on the use of the functional primitives “map” and “reduce” present in LISP and other functional languages.
Today, Hadoop, the “de facto” open-source implementation of MapReduce, is used by a wide variety of companies, institutions and universities. The massive usage of this programming model has led to the creation of multiple tools associated with it (which has come to be known as the Hadoop ecosystem) and even specialized companies like Cloudera engaged in training programmers to use it. Part of the success of such tools and companies lies in the now-evident difficulty and sharp learning curve involved in MapReduce, as it was originally defined, when applied to practical problems.
In this post we’ll review the MapReduce model proposed by Google in 2004 and propound another one called Tuple MapReduce. We’ll see that this new model is a generalization of the first and we’ll explain what advantages it has to offer. We’ll provide a practical example and conclude by discussing when the implementation of Tuple MapReduce is advisable.
The usage of MapReduce for solving problems like the typical “word count” is advisable and even intuitive. However, for many real-world problems, it can be excessively complex to code a solution based on MapReduce (for a reference on this matter, see this post about the shortcomings of the Hadoop MapReduce API).
MapReduce is currently seen as a low-level paradigm on top of which high-level tools must be built that are more intuitive and easy to use. However, another way of solving MapReduce’s shortcomings is to reformulate it.
If MapReduce were formulated differently, many problems would be easier to code. The high-level tools that could arise from it would also be easier to code.
This is the motivation that has led us to pose and formulate Tuple MapReduce.
The MapReduce model proposed by Google (and the one that Hadoop implements) can be conceptualized as:
The map function processes a certain (key, value) pair and emits a certain number of (key, value) pairs. The reduce function processes values grouped by the same key and emits another set of (key, value) pairs as output.
The original MapReduce approach is to use (key, value) pairs and it is specified that the groups that will be received in the reduce function will be grouped by the key.
Now we will show an extended MapReduce model, Tuple MapReduce, which we can formalize as:
In this case, the map function processes a tuple as input and emits a certain number of tuples as output. These tuples are made up of “n” fields out of which “s” fields are used to sort and “g” fields are used to group by. This diagram shows how sorting and grouping is done in greater detail:
In the reduce function, for each group, we receive a group tuple with “g” fields and a list of tuples for that group. Finally we’ll emit a certain number of tuples as output.
Tuple MapReduce extends the idea of MapReduce in order to be able to work with an arbitrary number of fields. It specifies how to sort and group in order to receive the tuples in the reduce function in a certain order. This formulation simplifies programming in several use cases.
Example use case
In Google’s MapReduce paper, we find a pseudo-code example of a typical “word count”. However, as we mentioned above, many real-world problems are difficult to materialize in MapReduce. These problems usually involve the use of compound data (tables) and / or need to be grouped and sorted in some specific way. Let’s see an example.
Imagine we have a register of daily unique visits for each URL: [“url”, “date”, “visits”]. We want to calculate the
unique visits cumulative number of visits up to each date from that register. In pseudo-code, the user could write a program that is something like:
map(Tuple tuple): EmitIntermediate(tuple); reduce(Tuple groupTuple, Iterator tuples): int count = 0; for each tuple in tuples: count += tuple.get("visits"); Emit(NewTuple(tuple.get("url"), tuple.get("date"), count));
The user would need to configure the Tuple MapReduce implementation in the following manner:
configureSortBy(TupleFields("url", "date")); configureGroupBy(TupleFields("url"));
The map function is quite simple: it just emits the registers as they are being processed. Because the user doesn’t need to create a key and a value, emitting a compound register is extremely easy. Because the process has been configured to sort by “url, date” and group by “url”, we are making sure that each URL group will receive the registers sorted by date.
The reduce function creates an accumulating visits counter for each URL. For each register of the group, we add up the visits from that day and emit the accumulated counter for that URL and day.
Other applications similar to this example would include: calculating increments in visits between consecutive days, calculating moving averages (e.g. average number of visits within the 30 days prior to each date), calculating unique visits in different time periods (year, month, week, …). All of these applications are quite common nowadays, and yet they are quite difficult to code in a simple and scalable way using MapReduce. In this blog post you’ll find an example of such complexity.
If we think about it, Tuple MapReduce is a more general model than MapReduce. MapReduce could be seen as a specific case of Tuple MapReduce where we only work with tuples that have two fields and limit the group and sort by tuples to one field (the first one, which would be the “key” in the original MapReduce).
Therefore we want to emphasize that Tuple MapReduce allows us to do the same things as MapReduce, while greatly simplifying the way that we code and understand it.
Tuple MapReduce can be implemented based on the same foundations as any current MapReduce architecture, and does not, in itself, involve changes in how the system is distributed or coordinated, but rather only the way that the user interfaces with the system.
Nowadays, we know that a quite common pattern in parallel data processing is joining multiple heterogeneous data sources. This problem is not inherently solved by the original formulation of MapReduce. Tuple MapReduce can inherently be generalized to allow joins between multiple data sources. Taking two data sources as an example, we can formalize a join in Tuple MapReduce in the following simplified manner:
The group tuple ‘g’ must be a prefix of all the tuples emitted from all data sources. In the reducer we’ll receive several lists of tuples for each group tuple: one list for each data source. These lists would be sorted, based, for instance, on the order in which the different data sources were defined.
In this post we have presented a new MapReduce model, Tuple MapReduce, and we have shown its benefits and virtues. We have generalized it in order to allow joins between different data sources (Tuple-Join MapReduce). We have noted that it allows the same things to be done as the MapReduce we already know, while making it much simpler to learn and use.
We believe that an implementation of Tuple MapReduce would be advisable and that it could act as a replacement for the original MapReduce. This implementation, instead of being comparable to existing high-level tools that have been created on top of MapReduce, would be comparable in efficiency to current implementations of MapReduce.
At Datasalt we are working on an implementation of Tuple MapReduce for Hadoop that we will open-source for the community in the near future.
UPDATE: We have presented a paper at IEEE International Datamining Conference ICDM 2012 about Tuple MapReduce. You can download it here