← blog

An example “lambda architecture” for real-time analysis of hashtags using Trident, Hadoop and Splout SQL

In this post we will show how to use Trident, Hadoop and Splout SQL together to build a toy example “lambda architecture“. We will learn the basics of Trident, a higher-level API on top of Storm, and Splout SQL, a fast SQL read-only DB for Hadoop. The example architecture is hosted on this github project. We will simulate counting the number of appearances of hashtags in tweets, by date. The ultimate goal is to solve this simple problem in a fully scalable way, and provide a remote low-latency service for querying the evolution of the counts of a hashtag, including both consolidated and real-time statistics for it.

So for any hashtag we want to be able to query a remote service to obtain per-date counts in a data structre like this:

{
  "20091022":115,
  "20091023":115,
  "20091024":158,
  "20091025":19
}

Lambda architecture

The “lambda architecture” is a concept developed by Nathan Marz. We like it because it provides an architectural model that scales and which has both the advantages of long-term batch processing and the freshness of a real-time system, with data updated in seconds time. We didn’t find toy examples of it on the Internet so we decided it would be useful to develop one and share it.

Trident

Trident is an API on top of Storm. We already talked about Storm before and showed a toy example of it in this post. Trident provides higher-level constructs on top of Storm, similar to those that Cascading provides to Hadoop (each(), groupBy()). It also provides wrappers and primitives for saving/retrieving state in a topology, be it in memory or in a persistent datastore.

Splout SQL

Splout SQL is a database developed by us that can pull data from Hadoop very efficiently. You can think of it as an ElephantDB with SQL. It is a partitioned, read-only, highly performant SQL database for Hadoop-scale datasets. We presented it in this previous post.

Gluing all the pieces together to form a Lambda

Tweets are fed into the system (1), for example through a queue from where we can pull them (Storm has connectors for Kafka, JMS and Kestrel, but you can easily develop your own one too). A Trident stream (2) saves them into Hadoop (HDFS) and processes them in real-time for creating an in-memory state with the counts by date. In Hadoop (3), where we have all the historical data available, we can run a batch process that aggregates the tweets by hashtag and date and generates a big file from it. After that, we can use Splout SQL’s command-line or API tools for indexing the file and deploying it to a Splout SQL cluster (4), which will be able to serve all the statistics pretty fast. Then, a second Trident stream (DRPC) can be used to serve timeline queries (5), and this stream will query both the batch layer (through Splout SQL) and the real-time layer (through the first stream’s memory state), and mix the results into a single timeline response. We will see each of these things in more detail.

The batch layer

The batch layer is the simplest one. We just need to append all tweets in the HDFS and periodically run some simple process that aggregates them by hour date. We didn’t develop this part, instead, we used a small sample dataset from an Infochimp‘s bigger one. The dataset looks like this and can be seen in this link:

hashtag	2009081313	1	calirfonia
hashtag	2009101713	1	caliroadtrip
hashtag	2009101815	2	caliroadtrip
hashtag	2009080813	1	caliroots
hashtag	2009092807	1	caliroots

There is plenty of literature and examples on the Internet on how to do such simple tasks with (from lower to higher level): Pangool, Cascading, Pig or Hive. The idea is that the output of the batch layer should look like above: a tabulated text file with hashtags counts. Because you save all the tweets in the HDFS, you can run a batch process that calculates many other things, and which recalculates everything from scratch every time. You have complete freedom and fault-tolerance here.

The serving layer

One of the challenges of the architecture is being able to serve low-latency queries under high throughput (multiple concurrent users) for a dataset that can be GBytes or even TBytes worth of data. Because you will calculate almost all the statistics in Hadoop, you need some kind of bridge between it and a fast database. You can’t directly query files generated from Hadoop. Things like Impala or Drill allow you to interactively explore the dataset, but you can’t just put them in front of a web server. You need indexes or B-Trees that can make lookups really fast.

You could just dump the statistics to a key/value store like Voldemort or ElephantDB, keyed by hashtag. But what if you wanted to be able to provide daily, weekly or monthly timelines? What if you want to group by hour to show the most active hours? Using a fast key/value store for Hadoop always means you need to pre-calculate everything you are going to need for your view, which isn’t always that satisfying. Splout SQL is usually more convenient as it allows for richer, flexible querying of pre-aggregated data.

In the the example’s github you will find instructions on how to quickly download Splout SQL and load the example dataset in it. Splout SQL’s command-line tools for indexing and deploying look like this:

hadoop jar splout-hadoop-*-hadoop.jar simple-generate -i sample-hashtags -o out-hashtags -pby hashtag -p 2 -s "label:string,date:string,count:int,hashtag:string" --index "hashtag,date" -t hashtags -tb hashtags
hadoop jar splout-hadoop-*-hadoop.jar deploy -q http://localhost:4412 -root out-hashtags -ts hashtags

The first line generates the indexed data structures (SQL files) needed for serving queries fast and the second line launches a deploy process that moves the files from the Hadoop cluster to the Splout SQL serving cluster.

Because Splout is a partitioned SQL, you need to specify partitioning schema and fields, as well as number of partitions to generate. You can also specify things like “fields to be indexed” if you know what kind of queries you will do. In this case we created a compound index on “hashtag” and “date” which would allow us to extend the application, being able to query the data between arbitrary time periods.

But for Splout SQL to be used by Trident’s DRPC we need to build a connector for it (SploutState). This connector extends ReadOnlyState and provides the necessary StateFactory required by Trident:

public class SploutState extends ReadOnlyState {
  // The Splout Java client
  private SploutClient sploutClient;
  ...
  public static class Factory implements StateFactory {
    ...
  }
}

It then implements a method that can receive multiple queries and execute them using Splout’s Java client:

public List<Object> querySplout(String tablespace, List<String> sql, List<String> keys) {
  ...
}

(Note how each query is associated with a partition key. This is necessary since Splout only queries one partition for each query.)
(Why multiple queries? Because Trident may group several Tuples into mini-batches that may be then executed more efficiently rather than doing them one-by-one.)

Another thing we need to do is implement a BaseQueryFunction (HashTagsSploutQuery) that will be used in conjunction with SploutState to define the DRPC stream. This function will contain the business logic involved in querying Splout for returning the data we want it to return. So, SploutState can be used together with any BaseQueryFunction. In this case, our function will look like:

public class HashTagsSploutQuery extends BaseQueryFunction<SploutState, Object> {

  public List<Object> batchRetrieve(SploutState state, List<TridentTuple> args) {
    List<String> sqls = new ArrayList<String>();
    List<String> partitionKeys = new ArrayList<String>();
    // fill the data
    for(TridentTuple arg: args) {
      String hashTag = arg.getString(0);
      sqls.add("SELECT SUM(count), substr(date, 0, 9) as day FROM hashtags WHERE hashtag = '" + hashTag + "' GROUP BY day;");
      partitionKeys.add(hashTag);
    }
    return state.querySplout(TABLESPACE, sqls, partitionKeys);
  }

  public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
    collector.emit(new Values(result));
  }
}

The first method, batchRetrieve(), is called by Trident with a batch of Tuples. We then call the underlying SploutState for resolving the query using a custom SQL that groups by day and queries by hashtag. The second method, execute() is called to obtain the data that will be appended to the Tuples in the stream. In this case we will append one more field to the tuple that will contain the result of each query.

We will see later how this part is connected to the rest of the system in more detail.

The real-time layer

The real-time layer is implemented using a Trident stream that saves state into a memory map. The code can be found in the topology class (LambdaHashTagsTopology) and it looks like this:

TridentState hashTagCounts = topology
  .newStream("spout1", spout)
  // note how we carry the date around
  .each(new Fields("tweet", "date"), new Split(), new Fields("word"))
  .each(new Fields("word", "date"), new HashTagFilter(), new Fields("hashtag"))
  .groupBy(new Fields("hashtag"))
  .persistentAggregate(mapState, new Fields("hashtag", "date"), new CountByDate(), new Fields("datecount"));

When developing Trident streams you have to keep in mind two things. One is the way Tuples are mutated around the stream: each() functions process a set of input Fields and emit a Tuple with these input fields together with the output Fields that the function emits. On the other hand, aggregate() functions only emit the fields that are derived from the function. If you want to emit a subset of the Fields of an each() function you can use project(). The other thing is that, because of this, you can’t emit output Fields that are named like an input Field (they would collide in the result Tuple).

Developing Trident streams follows the same programming pattern than that of Cascading. You need to create custom Functions or Filters that execute certain business logic you are interested in, and then inject an instance of them into the flow. For this case, we created “HashTagFilter” that takes an input word and only emits it if it’s a hashtag (#), in which case it emits all characters but the first one:

public static class HashTagFilter extends BaseFunction {
  public void execute(TridentTuple tuple, TridentCollector collector) {
    String word = tuple.getString(0);
    if(word.startsWith("#")) {
      collector.emit(new Values(word.substring(1, word.length())));
    }
  }
}

We also created an aggregator called “CountByDate” which is executed before persisting all the tuples for a certain hashtag. Because we want to show certain timelines, we need to group the real-time counts by some date. For simplicity we just grouped by day, but we could have extended this further. Keyed by hashtag, we will save a Map of date -> count. Because all this will be executed in parallel, we can provide a Combiner function for efficiency (like in Hadoop). The combiner will just merge two maps that may potentially have different keys.

public static class CountByDate implements CombinerAggregator<Map<String, Long>> {

  public Map<String, Long> init(TridentTuple tuple) {
    Map<String, Long> map = zero();
    map.put(tuple.getString(1), 1L);
    return map;
  }

  public Map<String, Long> combine(Map<String, Long> val1, Map<String, Long> val2) {
    for(Map.Entry<String, Long> entry : val2.entrySet()) {
      val2.put(entry.getKey(), MapUtils.getLong(val1, entry.getKey(), 0L) + entry.getValue());
    }
    for(Map.Entry<String, Long> entry : val1.entrySet()) {
      if(val2.containsKey(entry.getKey())) {
        continue;
      }
      val2.put(entry.getKey(), entry.getValue());
    }
    return val2;
  }

  public Map<String, Long> zero() {
    return new HashMap<String, Long>();
  }
}

We will see now how both the serving-layer and the real-time layed are connected through the DRPC service.

The DRPC service

One of Storm’s goodness is its ability to execute distributed RPC calls, and parallelize them. We can use this service for populating a website that will show timelines for hashtags. Now that we have a batch layer that computes all historical counts and feeds them into Splout SQL, and a real-time layer that can update hashtag counts happening right now in seconds time, how do we put all this together?

The DRPC service we add to the topology looks like this:

topology
  .newDRPCStream("hashtags", drpc)
  .each(new Fields("args"), new Split(), new Fields("hashtag"))
  .groupBy(new Fields("hashtag"))
  .stateQuery(hashTagCounts, new Fields("hashtag"), new MapGet(), new Fields("resultrt"))
  .stateQuery(sploutState, new Fields("hashtag", "resultrt"), new HashTagsSploutQuery(), new Fields("resultbatch"))
  .each(new Fields("hashtag", "resultrt", "resultbatch"), new LambdaMerge(), new Fields("result"))
  .project(new Fields("result"));

Queries are parallelized by “hashtag”. Two queries are executed in sequence, one to the real-time layer (hashTagCounts) and the other one to the batch serving-layer (sploutState). Note how we use Trident’s built-in MapGet() for querying the real-time layer and HashTagsSploutQuery() for querying Splout SQL. Note how the Tuple evolves to have three fields: the hashtag, and the result of each layer. Finally, we use a function called LambdaMerge() that merges the result into a new Tuple field called “result” and project the result to a one-field Tuple that will be returned to the DRPC user.

The business logic of merging the results from both layers is straight-forward. We assume that the batch layer is always right: therefore, if a value comes from it, it overrides any possible value from the real-time layer. We also return the Map sorted by key for convenience:

public static class LambdaMerge extends BaseFunction {

  public void execute(TridentTuple tuple, TridentCollector collector) {
    Map<String, Long> resultRealTime = (Map<String, Long>) tuple.get(1);
    QueryStatus resultBatch = (QueryStatus) tuple.get(2);
    TreeMap<String, Long> consolidatedResult;

    if(resultRealTime != null) {
      consolidatedResult = new TreeMap<String, Long>(resultRealTime);
    } else {
      consolidatedResult = new TreeMap<String, Long>();
    }
    if(resultBatch != null) {
      if(resultBatch.getResult() != null) {
        for(Object rowBatch : resultBatch.getResult()) {
          Map<String, Object> mapRow = (Map<String, Object>) rowBatch;
          String day = (String) mapRow.get("day");
          Long count = Long.parseLong(mapRow.get("SUM(count)").toString());
          consolidatedResult.put(day, count);
        }
      }
    }
    collector.emit(new Values(consolidatedResult));
  }
}

Trying it

You can follow the instructions on github for trying this example. The example will use two fake tweets for the real-time layer (#california is cool, I like #california) and query Splout where you would have loaded the example dataset – which also contains counts for california. So after running everything you should see something like this:

...
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":76}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":136}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":192}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":232}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":286}]]
...

As you see, the last date (which should match today’s date) is increased in real-time while the other historical dates are appended – they come from Splout.

Conclusions and more

Through this example we have shown Trident, an interesting and useful higher-level API on top of Storm, and Splout SQL, a fast, partitioned, read-only SQL for Hadoop. We have also shown a real example of a fully scalable “lambda architecture”, omitting certain parts that were not so relevant for the example.

But even though for this example to be complete we need to mention the role of a “Master coordinator”. This coordinator should trigger the batch layer periodically and it should assure that the batch layer always processes complete data. If we group by hour, no partial hour should be processed by the batch layer. By meeting this condition we can safely overwrite data from the real-time layer. On the other hand, for the real-time layer to be efficient, we would need to implement an expiration mechanism so that only a rolling time frame is kept in it. Care has to be taken to expire data soon for minimizing the memory footprint but not too soon so that the batch layer wouldn’t have enough time to complete.

As mentioned before, the full code with comments is on github and you can follow the README for executing it locally.



8 Comments on "An example “lambda architecture” for real-time analysis of hashtags using Trident, Hadoop and Splout SQL"

  1. Neither Drill or Impala build indexes, like Google’s Dremel they perform full scans

  2. Hi Tim,
    Yes, that’s what we say in the post: that Impala or Drill can’t just be put in front of a web server because they don’t have indexes.

  3. Hey Pere
    thanks for this great post, pretty much time saving, i have been looking into some much more easy to get started with sploutsql, and it comes in on the exact area where i am currently working on some social/sentiment analytics service, i read Nathan Lambda architecture but running Hbase at the very beginning wouldn’t be good for me as i wouldn’t be able to afford the cost of running at Hbase cluster, thanks alot for this useful information you shared. wish you guys at datasalt the best.

    Ebot.

  4. Thanks for your feedback Ebot, and don’t hesitate contacting us for help with Splout SQL… We are very interested in use cases for it!

  5. Revan says:

    Hi Bertran,

    Thanks for your very useful post. I’ve learn very much about lambda architecture with it.
    However, I have a question for you. In lambda architecture, the generated tweets are also persisted into HDFS but in your demo, I don’t see this part and initial tweet data is always the same, it should be updated with the new generated tweets, right? Could you help me to complete this part?

    Revan.

  6. Pere says:

    Hello Revan,

    In this post we put emphasis on the batch / real-time processing and merging part. We didn’t implement a tweet collector. As suggested, this could be implemented by a backing queue (e.g. Kafka) which is used by Storm to pull new tweets (search for “Kafka Spout”). In order to persist the tweets to the HDFS there are various options. One could be to use HBase, although it implies one more system to maintain. Another option could be for every Storm worker to open a file into the HDFS and close it periodically. I haven’t tried myself, but Nathan Marz seems to have some tools to alleviate this part: https://github.com/nathanmarz/dfs-datastores . One detail in this idea is fault-tolerance: in the default at-least once Storm semantics it would be possible to persist the same tweet more than once into the HDFS. This would be taken into account by the batch processing layer, i.e. deduplicating the tweets by tweet-id. Yet another option would be to persist Kafka data directly into the HDFS not by using Storm, but by running a MR Job (see https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer and http://oobaloo.co.uk/tag/hdfs).

  7. Marius Soutier says:

    Thanks for this example.

    Just one question: One advantage of using Splout is that the client can run arbitrary queries against it. Your SploutState also supports this, however the topology has hard-coded “queries”. Is there an easy way to combine an ad-hoc query with a topology?

  8. Hello Marius,

    Indeed this example is just doing a very basic aggregation, but there is no problem in making everything more parametric. A parameter could be read in the DRPC service that would indicate the kind of query to be issued, for instance. The only thing to keep in mind is that everything should be mergeable within the batch and real-time layers. If you provided an arbitrary SQL query as parameter, and if the real-time layer had SQL capabilities, care would need to be taken when merging the results.

Leave a Comment