← blog

Front-end view generation with Hadoop

One of the most common uses for Hadoop is building “views”. The usual case is that of websites serving data in a front-end that uses a search index. Why do we want to use Hadoop to generate the index being served by the website? There are several reasons:

  • Parallelism: When the front-end needs to serve a lot of data, it is a good idea to divide them into “shards”. With Hadoop we can parallelize the creation of each of these shards so that both the generation of the view and service of it will be scaled and efficient.
  • Efficiency: In order to maximize the efficiency and the speed of a front-end, it is convenient to separate the generation from the serving of the view. The generation will be done by a back-end process whereas the serving will be done by a front-end; in this way we are freeing the front-end from the load that can be generated while indexing.
  • Atomicity: It is often convenient to have a method for generating and deploying views atomically. In this way, if the deployment fails, we can always go back to previous complete versions (rollback) easily. If the generation went badly we can always generate a new full view where the error will be solved in all the registers. Hadoop allows us to generate views atomically because it is batch-oriented. Some search engines / databases allow atomic deployment by doing a hot-swap of their data.

The schema of the architecture to follow is shown below:

A Hadoop process receives the complete data contained in the view. This process is parallelized in order to generate as many shards as the front-end needs.

Usage examples

Use case example #1

Apache SOLR

In spite of the lack of clear information available, using patch SOLR-1303 is quite simple. If our Map/Reduce receives the data to be indexed, we just need to implement a Reducer (we do not need a Mapper) in the following way:

public static class SolrBatchIndexerReducer {		
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    SolrRecordWriter.addReducerContext(context);
  }
}

The next thing we need to do is to implement a SolrDocumentConverter that will convert the (key, value) data that our Map/Reduce will receive into indexable documents (SolrInputDocument). Let’s assume that our Map/Reduce receives [Text, Text] pairs with a key and value:

public class MyDocumentConverter extends SolrDocumentConverter<Text, Text> {
  @Override
  public Collection<SolrInputDocument> convert(Text key, Text value) {
    ArrayList<SolrInputDocument> list = new ArrayList<SolrInputDocument>();
    SolrInputDocument document = new SolrInputDocument();
    document.addField("key", key);
    document.addField("value", value);
    list.add(document);
    return list;
  }
}

Caution! We need to have an SOLR configuration (schema.xml) matching the business logic we are doing here: the patch will instantiate an EmbeddedSolrServer that will read this configuration. Finally, in order to configure our Job, we have to add the following lines in addition to the usual things:

// ...
SolrDocumentConverter.setSolrDocumentConverter(MyDocumentConverter.class, job.getConfiguration());
job.setReducerClass(SolrBatchIndexerReducer.class);
job.setOutputFormatClass(SolrOutputFormat.class);
File solrHome = new File("solr-conf/my-core");
SolrOutputFormat.setupSolrHomeCache(solrHome, job.getConfiguration());

How is the SOLR configuration distributed throughout the Hadoop cluster? -> In the machine where we launch the indexing we need to have the SOLR configuration used for indexing. The patch will zip it and send it to the entire cluster through Hadoop’s DistributedCache mechanism.

For further information on SOLR-1301, you can see this link.

  • Deploying with the SOLR API:

For deploying we can use the SolrJ library from the SOLR API. It contains a simple set of instructions for handling the creation, swapping and destroying of SOLR cores.

CoreAdminRequest.Create req = new CoreAdminRequest.Create();
// ...
adminServer.request(req);
CoreAdminRequest aReq = new CoreAdminRequest();
aReq.setAction(CoreAdminAction.SWAP);
aReq.setCoreName("liveCore");
aReq.setOtherCoreName("tmpCore");
// ...
adminServer.request(aReq);

Therefore, one requirement for this to work is that our SOLR must be configured in a multi-core fashion.

Use case example #2

Project Voldemort

  • Front-end that serves its data in Voldemort (a fast distributed key-value database developed by the LinkedIn’s SNA team).
  • Creating the view with AbstractHadoopStoreBuilderMapper (see this link).
  • Deployment script swap-store.sh also described in this link.

Use case example #3

A specific use case that only you need to develop.

  • Your special front-end that serves your own particular data format.
  • Creation of the view:

How can you distribute the view generation of multiple shards? You have several options for this parallelism:

  • One shard per Reducer: For instance, in the typical example of equal partitioning, you can use a hash function for equally distributing data to “N” Reducers – for example, with Hadoop’s default Partitioner – so that each of them generates one view. This is the mechanism that SOLR-1301, mentioned above, uses.
  • One shard per Reducer group: For instance, if we want to group the shards logically by some key. If we want to generate one view per country, for example, this would be the simplest way. In this case we would group the registers by country in the Mapper and generate one view per Reducer group.

The best option in this case is to implement your own OutputFormat.

How can you write any file type for front-end serving as an output of a Hadoop task? You need to use the Hadoop API to create a file of any kind in the output folder of a task. For instance:

Path perm = FileOutputFormat.getOutputPath(context);
Path tmpLocalFile = new Path("shard-1");
Path localFileToUse = fS.startLocalOutput(perm, tmpLocalFile);
// ...
fS.completeLocalOutput(perm, tmpLocalFile);
  • Deploying the view: With Hadoop Java API, a script, r-sync, …

Disadvantages / Cases where this cannot be applied

  • This is a scalable and efficient method for generating and deploying complete views atomically. It is not meant to be used for maintaining a view that is incrementally updated in real time.
  • As of the date of this article, these front-ends cannot be applied: ElasticSearch (in spite of Wonderdog and this good post, because we can’t do atomic deployment. The way we can deploy data with this is by using incremental batched HTTP requests, which also makes the indexing process potentially inefficient).
  • MongoDB: Pending changes in this project, as of the publication date, it is not possible to generate a complete Mongo table and deploy it atomically. It has the exact same problem as ElasticSearch.

Finally

In this presentation Marc Sturlese from Trovit explains some of the details of how Trovit generates their indexes with Hadoop.

And this is an advanced use case example explained by Marc de Palol: view generation using HFile files.



Leave a Comment