← blog

Pangool + SOLR

In this blog we have already talked about the convenience of using Hadoop for batch-indexing for producing a Lucene inverted index, for instance, that would be deployed into SOLR. We have talked about SOLR-1301, which is commonly used for SOLR indexing with Hadoop. In this article we’re going to present the recent integration of Pangool with SOLR which is a lot simpler and more powerful.

Pangool + SOLR != Hadoop + SOLR

You probably already know about Pangool, a low-level library that we have developed to facilitate Hadoop development. Its virtues include easy adaptation to common patterns like secondary sorting and joins. Furthermore, it “augments” the Hadoop API by providing facilities such as instance-based configuration and native multiple Inputs/Outputs – and these two things in particular are the ones that we have leveraged to create a suitable and smooth integration of Pangool with SOLR.

But, What are the problems of the current Hadoop + SOLR integration?

  • It’s not at all intuitive to use. There is hardly any documentation out there on how to use it. You need to do several things: add several lines that call static methods, you are forced to implement a DocumentConverter whether you want to or not… In sum, a nightmare.
  • It is limited and monolithic. You can’t, for instance, produce more than one kind of SOLR index as output of your Map/Reduce job.

Both problems are due to the inherent defects in the Hadoop API. Pangool solves a lot of these problems, and that’s why we wanted to leverage it to offer a smooth, easy integration between Pangool and SOLR. In order to illustrate this, we are going to program an indexer of Shakespeare’s plays that will produce an index for each kind of play (COMEDIES, HISTORIES, POETRY, TRAGEDIES).

Multi Shakespeare Indexer

This is the idea: we’ll download Shakespeare’s plays and uncompress them (here you have the link). We’re going to produce a SOLR index for each of the categories with the following fields: line number, line text and play title. In this way we could search for certain words and see which plays and which lines they appear in.

To do this, we are going to program a Pangool task that will read the plays as input and in the Map stage, it will emit a Tuple with the following data:

  • line number
  • line text
  • play title
  • category (COMEDIES, HISTORIES, POETRY, TRAGEDIES)

These Tuples will be received in the TupleReducer where we’ll then emit them to a “named output” depending on their category. Each “named output” will be configured to use TupleSolrOutputFormat, which is the Pangool OutputFormat that implements SOLR integration. In this way, when the task is complete, we’ll have four indexes, each of them in a different folder: COMEDIES, HISTORIES, POETRY, TRAGEDIES.

Furthermore, with this idea each of the indexes will be split into “n” fragments where “n” is the number of Reducers in the task. This is how we can normally achieve distributed indexing, which we can later serve in a distributed SOLR using “sharding”.

Implementation

We can see the MultiShakespeareIndexer code here. Next we’ll briefly discuss the process we have followed to implement it.

First of all, we need to define a folder with the SOLR configuration we’ll use. In Pangool’s repository we have added the folder “examples/src/test/resources/shakespeare-solr” with conf/schema.xml and conf/solrconfig.xml files. These XMLs will be read by an embedded SOLR that will perform the indexing process. The most relevant part of it is the following part of the schema.xml, which defines which fields will be indexed and their types:

<fields>
  <field name="line" type="long" indexed="true" stored="true" sortMissingLast="true"/>
  <field name="text" type="string" indexed="true" stored="true" sortMissingLast="true"/>
  <field name="title" type="string" indexed="true" stored="true" sortMissingLast="true"/>
</fields>

<defaultSearchField>text</defaultSearchField> 

Next we’ll program the task itself. We’ll define Shakespeare’s categories as an enum:

public static enum Category {
	COMEDIES, HISTORIES, POETRY, TRAGEDIES
}

Next we’ll define the Pangool Schemas that we’re going to use throughout the task. We are going to use two Schema instances because the intermediate one will contain the category while the final one won’t (we don’t need to index the category for each line because we are already splitting the index into per-category indexes).

final static Schema SCHEMA = new Schema("shakespeare", Fields.parse("line:long, text:string, title:string, category:" + Category.class.getName()));
final static Schema OUT_SCHEMA = new Schema("shakespeare", Fields.parse("line:long, text:string, title:string"));

In order to be able to add the category to the intermediate tuples we need to have a stateful TupleMapper associated with each play. Therefore, we’ll create a CategoryMapper which will receive a Category and a play title, and we’ll associate each of Shakespeare’s plays with a different instance of CategoryMapper:

public static class CategoryMapper extends TupleMapper<LongWritable, Text> {

	Category category;
	String title;
	ITuple tuple = new Tuple(SCHEMA);

	public CategoryMapper(Category category, String title) {
		this.category = category;
		this.title = title;
		tuple.set("title", title);
		tuple.set("category", category);
	}

	@Override
	public void map(LongWritable key, Text value, TupleMRContext context, Collector collector) throws IOException, InterruptedException {
		tuple.set("line", key.get());
		tuple.set("text", value.toString());
		collector.write(tuple);
	}
}

To start configuring the task we’ll create a TupleMRBuilder, set the intermediate Schema, define a “group by” (we’re going to group by “line” to provide a more or less even distribution of the data so that each Reducer will produce similarly-sized sub-indexes) and we’ll iterate over the Categories in order to add inputs and outputs:

TupleMRBuilder job = new TupleMRBuilder(conf);
job.addIntermediateSchema(SCHEMA);
job.setGroupByFields("line");
...		
for(Category category : Category.values()) { // For each Category
	String categoryString = category.toString().toLowerCase();
	// Add the category, book title input spec with the associated CategoryMapper
	for(FileStatus fileStatus: fileSystem.listStatus(new Path(input + "/" + categoryString))) {
		job.addInput(fileStatus.getPath(), new HadoopInputFormat(TextInputFormat.class), new CategoryMapper(category, fileStatus.getPath().getName()));
	}
	// Add a named output for each category
	job.addNamedOutput(categoryString, new TupleSolrOutputFormat(new File("src/test/resources/shakespeare-solr"), conf), ITuple.class, NullWritable.class);
}

This part is particularly relevant:

new TupleSolrOutputFormat(new File("src/test/resources/shakespeare-solr"), conf)

That’s all we have to do to define an output that will produce a SOLR index. We’ll pass the File corresponding to the folder that contains files such as conf/schema.xml or conf/solrconfig.xml, together with the Configuration object through the constructor .

Finally, we need to program a Reducer that will receive the intermediate Tuples and write to one “named output” or another depending on the category:

// The reducer will just emit the tuple to the corresponding Category output
job.setTupleReducer(new TupleReducer<ITuple, NullWritable>() {

	ITuple outTuple = new Tuple(OUT_SCHEMA);
			
	public void reduce(ITuple group, Iterable<ITuple> tuples, TupleMRContext context, Collector collector) throws IOException, InterruptedException, TupleMRException {

		Category category = (Category) group.get("category"); 
		for(ITuple tuple: tuples) {
			outTuple.set("line",  tuple.get("line"));
			outTuple.set("text",  tuple.get("text"));
			outTuple.set("title", tuple.get("title"));
			collector.getNamedOutput(category.toString().toLowerCase()).write(outTuple, NullWritable.get());
		}
	}
});

In this case we don’t care about the Job’s main output so we can set it to NullOutputFormat:

job.setOutput(new Path(output), new HadoopOutputFormat(NullOutputFormat.class), ITuple.class, NullWritable.class);

Finally, we create the Hadoop Job and execute it, and that’s it!

Job hadoopJob = job.createJob();
hadoopJob.waitForCompletion(true);

Some things worth noting:

  • The tuples that we emit must conform to what we defined in the schema.xml. Pangool will convert them automatically to SolrInputDocument instances using the DefaultTupleDocumentConverter class.
  • If, for any reason, we need to have control over the conversion process between ITuple and SolrInputDocument (for instance, in order to add “boosting” to one of the fields), then we’ll need to pass our own implementation of TupleDocumentConverter to the TupleSolrOutputFormat instance.

This example is available from version 0.43.1 of Pangool on. In order to execute it (take into account that you need to have Shakespeare’s plays uncompressed in the HDFS in a folder called “shakespeare”):

hadoop jar pangool-*-examples-job.jar multi_shakespeare_indexer shakespeare/ out-shakespeare

Conclusion

We have shown a simple example of a Pangool + SOLR use case where we have indexed all Shakespeare’s plays into four different indexes depending on their category. Pangool’s SOLR integration is very simple to use, with just a TupleSolrOutputFormat needing to be instantiated in order to use it. Moreover, we can use this OutputFormat in any “named output” in order to produce more than one index in the same task, which is impossible to achieve in Hadoop.



Leave a Comment