← blog

Real-time feed processing with Storm

One of the most interesting open-source projects that has been released in recent months is Storm, programmed by Nathan Marz from Backtype (now in Twitter). This video shows the basic cases for using Storm and why Storm is needed. There’s also a very good wiki where we can learn all sorts of things about Storm.

In this article I will present a practical use case that we can implement with Storm and I will discuss why Storm is a very good tool for this use case.

“Real-time feed processing”

The problem is this: We have a massive amount of data that we need to parse from different data sources, such as XML feeds. This use case can be found, for instance, in vertical search engines. In this post we have already seen how to implement scalable architecture for batch-processing and creating an index for these data. But, what happens if we want to update our data in real time, for instance by parsing incremental feeds that change very often?

Simple solution (one machine)

A simple way to attack this problem would be to have a single process that updates the feeds continuously. The problem with this solution is that it is totally limited to a single machine’s computing / downloading capacity.

Scalable, complex solution

If one machine is not enough, we can use N machines and replicate the process. For instance, by doing “mod hashing” on the feed ID on each machine (feedId % n). In this way each machine will only be in charge of a subset of feeds. This solution is scalable, but it has several problems:

  • We have to scale manually. If we want to scale, we have to manually add a new service and deploy it ourselves.
  • It is an architecture lacking in fail-over, or requiring manual fail-over. If one machine fails, a subset of the feeds will not be updated.

Queue / Workers

One of the problems of the second solution can be solved by implementing a master service that submits work to slave workers through a queue. In this way, if one of the workers goes down, the other ones can still process the full set of feeds.

Queue / Workers is a common pattern. This pattern is often applied to several problems.

However, this pattern has some inherent complexities: What kind of queue should we use? How should we admin / deploy / maintain the queue?

Hadoop to the rescue?

Another possible solution would be to implement feed updating with Hadoop. We would program a Map/Reduce Job that would emit the feeds in the Map step and receive them in the Reduce step. In this way each Reducer would have a subset of the feeds and would be able to work on them. However, the Map/Reduce takes as long to be completed as it takes the slowest Reducer to be completed. Therefore, we would have unknown cycle completion times and the “real-time property” of the system would not be assured.

Multi-level workers

Another problem that could arise is the following: Let’s imagine we have N processes updating feeds but we don’t want all of them to update the database directly. There are some reasons why we might want to do this: efficiency, locality, database configuration… In this case we would need another set of M processes (where M < N) which would be the ones in charge of updating the database, buffering, and so on. How should we connect them? Through another queue. Things are starting to become more and more complex!

Isn’t there a framework that allows us to deploy a (possibly multi-level) Queue / Workers system without the inherent complexities of such systems?

Good news: this system exists and it is called Storm!

Real-time feed processing with Storm

Storm is a framework that abstracts the inherent complexities of any Queue / Workers system. It is a generalization of these architectures that allows us to write real-time topologies without needing to worry about details like how to scale, how to implement fail-over or Inter-Process-Communication.

Storm has a master node (called Nimbus) and N slave nodes (called Supervisor). It implements coordination through Zookeeper, Inter-Process-Communication through 0MQ and it allows us to implement topologies in Java and even in some dynamic languages.

In order to implement the real-time feeds topology with Storm we need to define a Spout (FeedSpout) and a Bolt (FetcherBolt). In Storm, Spouts are the processes in charge of generating work, whereas Bolts may do any kind of operation on such work. In this case we will make the FeedSpout emit feeds to process (feed URLs) and the FetcherBolt to download and parse them.

Let’s see the FeedSpout code:

public class FeedSpout extends SimpleSpout {

  private static final long serialVersionUID = 1L;
  Queue<String> feedQueue = new LinkedList<String>();
  String[] feeds;

  public FeedSpout(String[] feeds) {
    this.feeds = feeds;
  }

  @Override
  public void nextTuple() {
    String nextFeed = feedQueue.poll();
    if(nextFeed != null) {
      collector.emit(new Values(nextFeed), nextFeed);
    }
  }

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    super.open(conf, context, collector);
    for(String feed: feeds) {
      feedQueue.add(feed);
    }
  }

  @Override
  public void ack(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void fail(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("feed"));
  }
}

In order to simplify this example, we have made the FeedBolt receive the full list of feeds in its constructor although these feeds could be in a database or anywhere else. There is an in-memory queue from which the feeds to be processed are pulled one by one. Storm, with its control layer, is in charge of deciding when to call the nextTuple() method. We can configure things such as how many messages can be pending after this method has been called several times. In this way intermediate queues will never be flooded.

Furthermore, Spouts receive ACK and FAIL messages. This mechanism is called “Reliability API” and allows us to know whether a certain message has been processed or, to the contrary, it has failed. In this case we are using this API in order to have feedback coming from Bolts so we know when we can process a feed again.

Now, let’s see the code for the FetcherBolt:

public class FetcherBolt implements IRichBolt {

  private static final long serialVersionUID = 1L;
  private OutputCollector collector;

  @Override
  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    FeedFetcher feedFetcher = new HttpURLFeedFetcher();
    String feedUrl = input.getStringByField("feed");
    try {
      SyndFeed feed = feedFetcher.retrieveFeed(new URL(feedUrl));
      for(Object obj : feed.getEntries()) {
        SyndEntry syndEntry = (SyndEntry) obj;
        Date entryDate = getDate(syndEntry, feed);
        collector.emit(new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));
      }
      collector.ack(input);
    } catch(Throwable t) {
      t.printStackTrace();
      collector.fail(input);
    }
  }

  private Date getDate(SyndEntry syndEntry, SyndFeed feed) {
    return syndEntry.getUpdatedDate() == null ? (syndEntry.getPublishedDate() == null ? feed.getPublishedDate() : syndEntry.getPublishedDate()) : syndEntry.getUpdatedDate();
  }

  @Override
  public void cleanup() {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("link", "date", "description"));
  }
}

In this case the Bolt is quite simple: it just receives a feed to process, downloads it and parses it. Finally, it emits “listings” (tuples with “link”, “date” and “description” fields). These listings could be processed by another layer of Bolts, in order to compute real-time statistics, for example, or to save them in a database.

We can also see how we use the “Reliability API” in order to communicate the success or failure of processing a feed (methods called ack() and fail()). How does this work, exactly? Storm has some processes called “ackers” that efficiently store the execution tree for a tuple. If we want this to happen, we have to do what is called “anchoring” (emitting a tuple associated with an ID Object). When the tuple fails in a certain Bolt, Storm knows which Spout to send the FAIL message to. A timeout could also have happened (by default, 30 seconds), in which case a FAIL would also be sent to the Spout. With this, we can implement quite sophisticated fail-over policies when needed.

Finally the last thing to do is to define the topology itself (FeedTopology).

The code for this example can be downloaded here and it can be executed against a set of XML Craiglist feeds. In this project I have also included one more Bolt (ListingBolt) that calculates the last 10 listings parsed by the FetcherBolt, according to the date in the feed. In order to execute this project you just need to git clone and mvn install. The topology will be executed in the same build for a few seconds and you’ll see the last 10 listings on screen updated every second.

Conclusions

We have seen what Storm is and we have shown a specific use case. This use case would be quite complex to implement without Storm because we would need to deploy a potentially multi-level “Queue / Workers” architecture with all the complexities this involves. With Storm, all we need to do is define the topology and execute it in a Storm cluster. We can use this deployment script to do this. If we want to scale up our solution, we just need to add more machines to the cluster.

Storm is to real-time computation what Hadoop is to batch computation.

Finally we should point out that in this project there are several good useful Storm examples.



4 Comments on "Real-time feed processing with Storm"

  1. The link gives me a 404 “http://www.datasalt.com/2012/01/real-time-feed-processing-with-storm/link-goes-here”

  2. Great article! I’ve been considering this problem for a startup with similar architecture, doing analytics. A year ago I was a grid computing noobie and as a result we have a nasty “rake-oriented” ruby on rails task hierarchy that strangles MySQL in every way you’re not supposed to.

    I’ve mostly come to the same conclusions; it seems Storm is the ideal model to use for feed collection. One really important thing for us is the concept of “document replay”, the ability to not only accept realtime feeds and process them in parallel, but also run that same data back through the system if you need to generate new stats that require historical documents. With our existing architecture (and even some parallel alternatives), you end up having to use up extra time writing one-off code just to bring some new metric up-to-date. Storm seems well-suited to this if you build a little framework around it that focuses on archiving documents for future use and dealing with use cases related to quota limits (and that framework is what I am spec’ing right now…hopefully I can write on it one day!).

  3. Thank you, very useful article to understand the difference between Storm an Hadoop.

  4. Jan, thanks for reporting, this is the link that was missing: http://www.datasalt.com/2011/10/scalable-vertical-search-engine-with-hadoop/

    Daniel, looks like maybe Hadoop would make sense for your historical repository and stats generation. Storm could append data to HDFS that will be later processed in batch.

Leave a Comment