← blog

MapReduce & Hadoop API revised

Nowadays, Hadoop has become the key technology behind what has come to be known as “Big Data”. It has certainly worked hard to earn this position. It is mature technology that has been used successfully in countless projects. But now, with experience behind us, it is time to take stock of the foundations upon which it is based, particularly its interface. This article discusses some of the weaknesses of both MapReduce and Hadoop, which we, at Datasalt, shall attempt to resolve with an open-source project that we will soon be releasing.

MapReduce

MapReduce is the distributed computing paradigm implemented by Hadoop. Its interface is based on implementing two functions, Map and Reduce, which process key/value data pairs. The files issued by the Map function are grouped by key and are received as a single group in the Reduce function.

Experience has shown us that the setup proposed by MapReduce for data processing creates difficulties for a series of issues that are quite common to any Big Data project. Let’s take a look at a few of these.

Compound records

Key/value files are sufficient for implementing the typical WordCount, for example, since only two types of data per file are needed: a string for the word and an integer for the counter.

However, the vast majority of cases call for more than 2 fields per record and this does not fit in with the key/value format. To solve this, compound records must be created. There are two ways to do this in Hadoop:

  1. By implementing your own compound data type by implementing Writable. This solution is usually highly complex and time-consuming so it is not feasible in a project with many types of data.
  2. By using a serialization library such as Thrift or Avro.

This latter solution is less efficient, but in general it is preferable to the first option. And even so, it is insufficient. You usually end up creating numerous different records for a single piece of data. This is necessary to adapt the record to the key or the value.

In short, the key/value model is too restrictive and therefore hinders development. A more flexible option is needed.

Sorting

The MapReduce model does not define the posibility of receiving the values sorted in some way in each call to the Reducer function. However, Hadoop enables us to sort in this manner by correctly combining:

  • A key that contains the values for secondary sorting
  • A specific Partitioner
  • A specific group comparison function.

Secondary Sorting in Hadoop

In other words, a mess. Unfortunately, however, this is one of the most common patterns used in programming with Hadoop: you want the value records to reach the reducer in a certain order. For example, this is the case when you want to analyze the events occurring in a one-hour window and you want to analyze them in the order of occurrence, so you need the events to be sorted in order.

One alternative would be to sort them in memory in the Reduce function itself. This would only work if the records received in the Reducer fit into the memory. But weren’t we talking about creating scalable systems?

Getting the records sorted at the Reducer shouldn’t be so difficult.

Joins

Doing joins between two data sets with different schema is almost essential when programming with MapReduce. For example, if you want to calculate the average expense per age group, you need to do a join between People and Sales datasets.

To do this in Hadoop, you would have to create two map functions, one for the People dataset and another one for the Sales dataset. Both map functions have to emit the person ID, which is present in both People and Sales, as the key, but what do they issue in the value? Hadoop has to receive the same type of data in the value, regardless of where it comes from. So we would have to create a new kind of record that joins the Person and the Sale: the same type of data is sometimes a Person, and other times, a Sale. Furthermore, some sorting is needed so that the Person record always reaches the Reduce function before the multiple Sale records done by this Person do (since this is a 1-n join). I won’t go further into detail, but it is easy to see that this is really complicated, keeping in mind how common the pattern is. There are other tricks for doing this, but none of them are satisfactory.

There must be a simpler way of doing a Reduce join and other calculations with heterogeneous data sources.

Hadoop

Instantiation vs. serialization for the processing logic

In Hadoop, the processing logic moves to where the data are. The mechanism that Hadoop uses for this task is to send the Java class that contains the logic to all the nodes, and to create a new instance for each task. For example, the Map and Reduce function to be applied are configured like this:

job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);

This method works well in most cases, but makes it hard to create modular Map and Reduce functions. The problem is that, with the current mechanism, the only way of transmitting status or configuration to your Map and Reduce functions is by using the Configuration object, which is a key/value map with Strings.

We feel that it is simpler and more robust if the logic is sent by serialization of previously created instances. This would allow us to do things like the following:

job.setMapperClass(new RegexpFilter("[a-z]*@[a-z]*"));

A lot more elegant and simpler than using the Configuration to send the regular expression.

Multiple output files

Each Hadoop Job usually receives several input files and generates several output files. Hadoop’s default API is ready to issue results in a single file. To be able to issue several files, the MultipleOutputs class should be used. This is sufficient in most cases. But it is not quite easy and it does not fit perfectly.

At any rate, we believe there must be native support for the option of issuing several files, making the job easier.

Alternatives

The alternative interface for Hadoop provided by Avro can be used (see our post on this topic). However, it does not solve all the problems discussed in this article.

There are also higher level tools such as Cascading, Hive and Pig which attempt to solve these problems. However, as higher level tools, they do not allow us to take advantage of all Hadoop’s capacities.

Conclusion

In sum, we need an alternative, low level Hadoop API, which enables all kinds of optimizations, while at the same time simplifying the development for the most common patterns. At Datasalt, we are working on this, and will be releasing the project for the community shortly.



2 Comments on "MapReduce & Hadoop API revised"

  1. I’d be very curious to have you elaborate how Cascading prevents you from “taking advantage of all Hadoop’s capacities”?

    What would those be that Cascading limits access to?

    chris

  2. Hi Chris,

    In this post we wanted to highlight the problems of the plain, low level Hadoop Map/Reduce API.

    We understand that Cascading, Hive and Pig are higher level tools built on top of this API and that they are not meant to be a complete replacement for it. On the other hand such tools are fantastic and capable of doing things that otherwise would be a nightmare to achieve with the plain Hadoop API (things like flow management, for instance).

    We think using one of these always comes with a tradeoff. Many things will be easier to do, but there will be certain things harder to achieve as well. There will also be some overhead and therefore a loss in performance for certain tasks – (This is expected since each API has to handle some degree of complexity).

    We like Cascading and think it is a great API and probably a great fit for many, many problems. On the other hand we prefer to use plain Map/Reduce for certain problems for various reasons:
    – Performance: Having full control on the intermediate serialization, partitioning, key comparison and how the business logic is exactly executed in Mapper, Combiner and Reducer is key to achieving the most out of Hadoop’s potential performance.
    – Flexibility: Being able to implement arbitrary business logic during the Map/Reduce allows us to do things that, at least, seem not possible with such tools. For instance, being able to join and sort multiple datasets by a common key and stream through each group’s values without saving in memory / spilling any of the datasets.

    I could elaborate more on particular problems that we had found when working with Cascading, Hive or Pig, but I also understand that these are fast-evolving tools and that certain observations made on them in the past might not be valid anymore.

    What I wanted to highlight is that Cascading (and others) are not a replacement for the Map/Reduce Hadoop API – but rather different (non-Map/Reduce) APIs that in our opinion makes them not the best fit for some fine-grained, performance-critical problems.

    Therefore we suggest that the Hadoop community will benefit from a replacement of the standard Hadoop Map/Reduce API and that it will coexist with these other tools. This new API should be much closer in efficiency to using the standard Hadoop Map/Red API and it should be as flexible as it is now, making it easier to learn Hadoop for beginners and making it even easier to build new APIs on top of it.

Leave a Comment