← blog

Building a parallel text classifier in Hadoop with Pangool

Some weeks ago we introduced Pangool, a low-level, easy-to-use library that aims to be a replacement for the default Hadoop API. In this post we’ll show how to build a simple distributed text classifier with Pangool.

This example will show us how, despite the fact that we are working with a low-level library, the most common tasks end up being extraordinarily simple to perform, so that in a short amount of time and with just a few lines of code we are able to develop something as complete as a scalable text classifier. We’re going to implement a text classifier that will perform a “sentiment analysis” task. Given an input text, we’ll expect the classifier to tell us whether the text’s sentiment is positive or negative. The examples that we will be working on will be hotel reviews, such as: “Fantastic hotel!”. Text classification is a “machine learning” task very commonly used nowadays for performing tasks like detecting spam, categorizing texts and so on.

Our classifier will be based on Naive Bayes with “add-one smoothing”.

Training

The first step of a classification task consists in generating a model that will be used for the classification itself. We would like the model generation itself to be scalable so that we can simply use more machines if we have a lot of textual data to train in order to finish the generation in a short amount of time. To do this, we will distribute the training by programming a Map/Reduce with Pangool.

The Map/Reduce task will receive text files in which each line will have a label and a text. The two will be separated by a tabulation character (to simplify here, we will assume that the text will not contain any other tabulations). For instance:

POSITIVE The room was fantastic. Plenty of light.

What the task will do is process these data in parallel, divide and normalize each word in the text and emit a count (1) for each word and category

POSITIVE, the, 1
POSITIVE, room, 1
POSITIVE, fantastic, 1
...

We’ll reduce by category and word and aggregate the partial counts in order to have a total count by word and category, something like:

POSITIVE, fantastic, 251
POSITIVE, good, 186
...
NEGATIVE, fantastic, 2
NEGATIVE, dark, 74
NEGATIVE, annoying, 89

This problem is like the “word count” problem that is always used for showing an example use case of Hadoop. However, in this case we need to emit three-field registers and group by two of them, which Hadoop doesn’t support easily, forcing us to program a lot of tedious and complicated code. 

This is the code that the model generator implements in Pangool:

TupleMRBuilder job = new TupleMRBuilder(conf, "Naive Bayes Model Generator");
job.addIntermediateSchema(INTERMEDIATE_SCHEMA);
job.addInput(new Path(inputExamples), new HadoopInputFormat(TextInputFormat.class),
	new TupleMapper<LongWritable, Text>() {

    ITuple tuple = new Tuple(INTERMEDIATE_SCHEMA);

	@Override
	public void map(LongWritable toIgnore, Text value, TupleMRContext context, Collector collector) throws IOException, InterruptedException {
		Category category = Category.valueOf(value.toString().split("\t")[0]);
		StringTokenizer itr = new StringTokenizer(value.toString().split("\t")[1]);
		tuple.set("category", category);
		tuple.set("count", 1);
		while(itr.hasMoreTokens()) {
			tuple.set("word", normalizeWord(itr.nextToken()));
			collector.write(tuple);
		}
	}
});

TupleReducer countReducer = new TupleReducer<ITuple, NullWritable>() {

	public void reduce(ITuple group, Iterable<ITuple> tuples, TupleMRContext context, Collector collector) throws IOException, InterruptedException, TupleMRException {
		int count = 0;
		ITuple outputTuple = null;
		for(ITuple tuple : tuples) {
			count += (Integer) tuple.get("count");
			outputTuple = tuple;
		}
		outputTuple.set("count", count);
		collector.write(outputTuple, NullWritable.get());
	}
};
job.setTupleCombiner(countReducer);
job.setTupleReducer(countReducer);
job.setGroupByFields("word", "category");
job.setTupleOutput(new Path(output), INTERMEDIATE_SCHEMA);

The M/R code can be self-contained in a single method if the class that declares it is Serializable. This is one of the virtues of the configuration by instance model that Pangool uses: it doesn’t force us to declare classes for each element of the M/R stack. As we can see, the M/R uses an intermediate schema (INTERMEDIATE_SCHEMA), which is key to being able to work with an arbitrary number of registers. We define this schema in the following manner:

INTERMEDIATE_SCHEMA = new Schema("categoryCounter", Fields.parse("category:" + Category.class.getName() + ", word:string, count:int"));

Here, we specify that word is a string, count an integer and category a Java “enum” that contains the categories that we want to classify:

public enum Category {
	POSITIVE, NEGATIVE
}

Indeed, we can use enum types natively in Pangool’s tuples without needing to worry how to serialize them. In this example we just need to change the Category enum for our classifier to accept other categories, and the rest of the code will continue to work normally. We can see the complete code example of the model generator here.

Classification

The second step is the classification itself. To do this we will read the output of the previous M/R and save it in convenient in-memory structures for executing the classification algorithm:

public void init(Configuration conf, Path generatedModel) throws IOException, InterruptedException {
	FileSystem fileSystem = FileSystem.get(conf);
	for(Category category : Category.values()) {
		wordCountPerCategory.put(category, new HashMap<String, Integer>()); // init token count
	}
	// Use a HashSet to calculate the total vocabulary size
	Set<String> vocabulary = new HashSet<String>();
	// Read tuples from generate job
	for(FileStatus fileStatus : fileSystem.listStatus(generatedModel)) {
		TupleInputReader reader = new TupleInputReader(conf);
		reader.initialize(fileStatus.getPath(), conf);
		while(reader.nextKeyValueNoSync()) {
			// Read Tuple
			ITuple tuple = reader.getCurrentKey();
			Integer count = (Integer) tuple.get("count");
			Category category = (Category) tuple.get("category");
			String word = tuple.get("word").toString();
			vocabulary.add(word);
			tokensPerCategory.put(category, MapUtils.getInteger(tokensPerCategory, category, 0) + count);
			wordCountPerCategory.get(category).put(word, count);
		}
	}
	V = vocabulary.size();
}

Basically, what we are doing is saving the “word count” for each possible category in a String – Integer map, in addition to calculating the total number of tokens per category and the global unique number of words. We need to have these data in memory so that we can use it for executing the classification itself as follows:

public Category classify(String text) {
	StringTokenizer itr = new StringTokenizer(text);
	Map<Category, Double> scorePerCategory = new HashMap<Category, Double>();
	double bestScore = Double.NEGATIVE_INFINITY;
	Category bestCategory = null;
	while(itr.hasMoreTokens()) {
		String token = NaiveBayesGenerate.normalizeWord(itr.nextToken());
		for(Category category : Category.values()) {
			int count = MapUtils.getInteger(wordCountPerCategory.get(category), token, 0) + 1;
			double wordScore  = Math.log(count / (double) (tokensPerCategory.get(category) + V));
			double totalScore = MapUtils.getDouble(scorePerCategory, category, 0.) + wordScore;
			if(totalScore > bestScore) {
				bestScore = totalScore;
				bestCategory = category;
			}
			scorePerCategory.put(category, totalScore);
		}
	}
	return bestCategory;
}

The chosen category is the one that maximizes the sum of logarithms of conditioned probabilities of each word to the category. As mentioned before, we have applied “add-one smoothing” for taking into account words that are unknown to the model.

Up to here, the Java classification code doesn’t seem to have anything to do with Pangool or with a distributed algorithm. We would like our Bayes classifier to be able to work in parallel to classify as many texts as we want, adding more machines when needed. To make this happen, we just need to add a few more lines:

init(conf, new Path(modelFolder));
		
MapOnlyJobBuilder job = new MapOnlyJobBuilder(conf);
job.setMapper(new MapOnlyMapper<LongWritable, Text, Text, NullWritable>() {
	protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
		value.set(value.toString() + "\t" + classify(value.toString()));
		context.write(value, NullWritable.get());
	}
});
job.setOutput(new Path(output), new HadoopOutputFormat(TextOutputFormat.class), Text.class, NullWritable.class);
job.addInput(new Path(input), new HadoopInputFormat(TextInputFormat.class));
job.createJob().waitForCompletion(true);

We can see the complete classifier code here. We have used the “map-only jobs” API of Pangool to add the possibility of executing our classifier in parallel in Hadoop.

Testing the generator and classifier

This example is available from version 0.40.3-SNAPSHOT of Pangool. If we have Hadoop running in a pseudo-distributed mode, we can perform the following test from Pangool’s root directory:

cd examples
mvn assembly:assembly
hadoop fs -put src/test/resources/nb-* .
hadoop jar target/pangool-examples-*-job.jar naive_bayes_generate nb-examples.txt out-bayes-model
hadoop jar target/pangool-examples-*-job.jar naive_bayes_classifier out-bayes-model/p* nb-test-examples.txt out-classify
hadoop fs -cat out-classify/p*

We should see the results of the classification:

Fantastic hotel! POSITIVE
Very disappointing. The room was small and dark. NEGATIVE

This example that we just executed has trained and classified very few data items, but if we change the input to the generator and the classifier to scale our little Naive Bayes classifier, we can reach unimaginable limits!



Leave a Comment