← blog

Hive + Splout SQL for a social media reporting webapp: A Big Data love story

(This is the second post of a series of three posts presenting Splout SQL 0.2.2 native integration with main Hadoop processing tools: Cascading, Hive and Pig).

In this post we’ll present an example Big Data use case for analyzing tweets and reporting consolidated, meaningful statistics to Twitter users through an interactive, low-latency webapp. For that we will marry Hive (an open-source warehousing solution for Hadoop that enables easy analysis and summarization over Hadoop) with Splout SQL, a highly-performant, low-latency partitioned SQL for Hadoop. We will build a very simple – yet scalable – analysis tool such as the Tweet archivist, and we will do it without even coding. The tool will provide historical mentions, summarized hashtags and popular tweets for every actor in the input dataset.

Twitter data

Twitter data is nowadays a representative, trendy example of Big Data. There are various companies analyzing it and making sense out of it: from social reputation sites like Klout or PeerIndex to social analytics sites like Topsy or Sprinkl. A few of them like Gnip or DataSift have access to the full firehouse and resell the raw tweet stream.

Building tools that are able to analyze such an enormous dataset is a challenge. In this post we will use a scalable processing tool (Hive) and a scalable, low-latency serving database (Splout SQL), and we will see how well they integrate with each other.

Requirements

In order to follow the steps in this post you should:

  • Have Hadoop and Hive installed in your computer. We tested this with Hadoop CDH3 and Hive 0.10.0.
  • Have “hive-site.xml” properly configured and Hive’s conf/ in CLASSPATH / HADOOP_CLASSPATH. Note that a “hive-default.xml” is deprecated and could not work.
  • Have Splout SQL in your computer. For using Splout SQL you just need to download its latest distribution, unzip it and start both QNode and DNode daemons. You can find more information in the “Getting started” section of the official webpage.

Preparing the input data (tweets)

We can download some example tweets from Twitter’s REST API using “curl“. We will use some popular accounts such as “BBCNews” or “Reuters”. We also need to split each tweet in a new line for Hive to process each of the tweets independently. For that we will use “sed“.

  mkdir temp
  cd temp
  curl "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=TechCrunch&count=200" | sed "s/},{/}\n{/g" > techcrunch_tweets.json
  curl "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=reuters&count=200" | sed "s/},{/}\n{/g" > reuters_tweets.json
  curl "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=BBCNews&count=200" | sed "s/},{/}\n{/g" > bbcnews_tweets.json
  curl "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=barackobama&count=200" | sed "s/},{/}\n{/g" > barackobama_tweets.json
  curl "http://api.twitter.com/1/statuses/user_timeline.json?screen_name=ladygaga&count=200" | sed "s/},{/}\n{/g" > ladygaga_tweets.json

We are going to use Hadoop with Hive, so we need to have both installed. Once Hadoop is running, we can create a folder in the HDFS and upload all the JSON files we have obtained:

  hadoop fs -mkdir tweets
  hadoop fs -put *.json tweets/

Finally, we open Hive and create an external table pointing to this “tweet container” folder (substituting “myuser” as needed):

  CREATE EXTERNAL TABLE tweets
    (jsonData STRING)
  ROW FORMAT DELIMITED FIELDS
    TERMINATED BY '\n'
    LOCATION '/user/myuser/tweets';

JSON + Hive

There are various ways of using JSON data in Hive. The one we use here is the most flexible one: we just create the table with one column (jsonData) and navigate the JSON as needed depending on the query we want to execute. For doing that we use the native Hive function json_tuple. First of all, we add the needed contrib JAR to the classpath (substitute %HIVE_HOME% and %HIVE_VERSION% as needed):

ADD JAR %HIVE_HOME%/lib/hive-contrib-%HIVE_VERSION%.jar;

The following query will parse the field “text” in each JSON:

  SELECT
    a.text
  FROM tweets
    LATERAL VIEW json_tuple(tweets.jsonData, 'text') a AS text;

The following query extracts the first mention of every tweet that has mentions:

  SELECT
    regexp_extract(a.text, "(@[^ ]*)", 1)
  FROM tweets
    LATERAL VIEW json_tuple(tweets.jsonData, 'text') a AS text;

Creating stats views with Hive

We will create three views: the mentions view, the retweets view and the hashtags view. In the mentions view we will populate every interaction between two Twitter users based on mentions in tweets. In the retweets view we will populate all tweets that have a “retweet_count” > 0, and in the hashtags view we will populate the number of times hashtags have been used by each user. Creating all these views is as easy as executing a Hive query for each of them, as we will see.

The mentions view can be created by the following query:

  CREATE TABLE mentions AS
    SELECT
      foo.screenname as mentioner,
      substr(wordTable.word, 2) as mentioned,
      to_date(from_unixtime(unix_timestamp(substr(foo.ttime, 5), 'MMM dd hh:mm:ss ZZZZZ yyyy'))) as daydate
    FROM (
      SELECT
        c.screenname,
        a.ttime,
        split(a.text, '[^a-zA-Z_0-9@#]') AS wordarray
      FROM tweets
        LATERAL VIEW json_tuple(jsonData, 'created_at', 'text') a AS ttime, text
        LATERAL VIEW json_tuple(jsonData, 'user') b AS user
        LATERAL VIEW json_tuple(b.user, 'screen_name') c AS screenname
      ) foo
      LATERAL VIEW explode(foo.wordarray) wordTable AS word
    WHERE wordTable.word LIKE '@%';

Now if we perform a simple SELECT … LIMIT 10 we will see how the table looks like:

Mentioner Mentioned Daydate
BarackObama OFA 2013-03-13
BarackObama FLOTUS 2013-03-13
BarackObama PressSec 2013-03-12

We have a mentioner who mentioned someone (mentioned) and the date where the mention happened.

The retweets table is fairly simple and can be created by the following query:

CREATE TABLE retweets AS
  SELECT
    c.screenname as user,
    a.text as tweet,
    to_date(from_unixtime(unix_timestamp(substr(a.time, 5), 'MMM dd hh:mm:ss ZZZZZ yyyy'))) as daydate,
    a.retweetcount as retweetcount
  FROM tweets
    LATERAL VIEW json_tuple(jsonData, 'created_at', 'text', 'retweet_count') a AS time, text, retweetcount
    LATERAL VIEW json_tuple(jsonData, 'user') b AS user
    LATERAL VIEW json_tuple(b.user, 'screen_name') c AS screenname
  WHERE a.retweetcount > 0;

Which has a user (the author), the tweet text, the day where the tweet was posted and the retweet count:

User Tweet Daydate Retweetcount
BarackObama Follow @OFA today for live updates from the founders’ summit of Organizing for Action. 2013-03-13 194
BarackObama Obama needs your help to pass the policies Americans voted for last fall. Say you’re in: … 2013-03-13 508

Finally, the top hashtags table:

CREATE TABLE tophashtags AS
  SELECT
    foo2.screenname AS user,
    foo2.word AS word,
    COUNT(*) as count
  FROM (
    SELECT
      foo.screenname as screenname,
      wordTable.word as word FROM (
        SELECT
          c.screenname,
          a.time,
          split(a.text, "[^a-zA-Z_0-9@#]+") AS wordarray
        FROM tweets
          LATERAL VIEW json_tuple(jsonData, 'created_at', 'text') a AS time, text
          LATERAL VIEW json_tuple(jsonData, 'user') b AS user
          LATERAL VIEW json_tuple(b.user, 'screen_name') c AS screenname
      ) foo
      LATERAL VIEW explode(foo.wordarray) wordTable AS word
    WHERE wordTable.word LIKE '#%'
  ) foo2
  GROUP BY foo2.screenname, foo2.word;

Which simply produces a view with the user, the hashtag used and the number of times it was used ever:

User Word Count
BarackObama #WeDemandAVote 29
BarackObama #JobsNotCuts 28

Deploying the views to Splout SQL

It is useless to have all this valuable information without being able to query them in sub-second time. We want to be able to provide a per-user panel where each user will navigate through historical information, and we want to do it both in a scalable and flexible way. Rather than precomputing the panel we will just deploy the Hive tables to Splout SQL: a scalable, low-latency SQL database which will feed the agile frontend.

In order to deploy the generated views in Hive we will create a deployment descriptor JSON called “hive_splout_example.json“. We can create this file in the local filesystem, in the Splout SQL installation directory:

{
	"name": "hive_splout_example",
	"nPartitions": 2,
	"partitionedTables": [{
		"name": "retweets",
		"partitionFields": "user",
	    	"tableInputs": [{
			"inputType": "HIVE",
			"hiveTableName": "retweets",
			"hiveDbName": "default"
		}]
	},{
		"name": "mentions_of_me",
		"partitionFields": "mentioned",
	    	"tableInputs": [{
			"inputType": "HIVE",
			"hiveTableName": "mentions",
			"hiveDbName": "default"
		}]
	},{
		"name": "mentions_by_me",
		"partitionFields": "mentioner",
	    	"tableInputs": [{
			"inputType": "HIVE",
			"hiveTableName": "mentions",
			"hiveDbName": "default"
		}]
	},{
		"name": "tophashtags",
		"partitionFields": "user",
	    	"tableInputs": [{
			"inputType": "HIVE",
			"hiveTableName": "tophashtags",
			"hiveDbName": "default"
		}]
	}]
}

In this JSON file we describe our Splout SQL tablespace, which is made by four tables. Note how we partition always by the user. Note how we import the same Hive table twice (mentions): this is because we are interested both in the mentions of a certain user and the mentioning actions that this user does to others. Because we are partitioning, for being able to query both datasets for the same user, we need to import the mentions table twice, partitioned by either mentioner or mentioned.

For deploying the tablespace to Splout SQL we can use the following command line, executed from the Splout SQL installation directory:

hadoop jar splout-*-hadoop.jar generate -tf file:///`pwd`/hive_splout_example.json -o out-hive_splout_example
hadoop jar splout-hadoop-*-hadoop.jar deploy -q http://localhost:4412 -root out-hive_splout_example -ts hive_splout_example

Querying / visualizing the results

Finally, we can go to Splout SQL’s administration panel to see that the tablespace has been deployed correctly, and perform some test queries. As an example, we can obtain the hashtag fingerprint for the user “BarackObama” as the picture illustrates:

ObamaHashtagsQuery

For feeding this information into a webapp we can use the REST API: http://localhost:4412/api/query/hive_splout_example?sql=SELECT * FROM tophashtags WHERE user = ‘BarackObama’ ORDER BY count DESC;&key=BarackObama

Which will return a JSON as a response:

{
  millis: 10,
  error: null,
  result: [
  {
    count: 29,
    word: "#WeDemandAVote",
    user: "BarackObama"
  },
  {
    count: 28,
    ...
  }]
}

The following table shows what queries we can issue to Splout SQL in order to populate a panel for each user:

What How (SQL)
Top people mentioning me SELECT mentioner, COUNT(*) as mentions FROM mentions_of_me WHERE mentioned = “BarackObama” GROUP BY mentioner ORDER BY mentions DESC LIMIT 10;
Top people I mention SELECT mentioned, COUNT(*) as mentions FROM mentions_by_me WHERE mentioner = “BarackObama” GROUP BY mentioned ORDER BY mentions DESC LIMIT 10;
Top hashtags I use SELECT * FROM tophashtags WHERE user = “BarackObama” ORDER BY count DESC LIMIT 10;
Most retweeted tweets from me SELECT * FROM retweets WHERE user = “BarackObama” ORDER BY retweetcount DESC LIMIT 10;

But we could also do other complex things like displaying a timeline of mentions (as we have the date in every mention), aggregates by period (month, week, year, …) and so on.

Conclusion

We managed to build a completely scalable Twitter summary tool and we did so without writing a single line of code! This was possible because of the integration between Hive and Splout SQL. Hive provides an easy, interactive interface to Hadoop which allows us to seamlessly populate “Big Data views”. On the other hand, Splout SQL can import those views from Hive and index them so that users can query them in sub-second latencies and under high load. Having SQL both in the backend and in the frontend allows us to be very flexible and agile. Don’t hesitate in trying this example and giving us feedback!



10 Comments on "Hive + Splout SQL for a social media reporting webapp: A Big Data love story"

  1. Amazing, this is just awesome, i love what you guys are doing at datasalt

  2. bindu says:

    Hi
    I have this error while executing the example csv in splout using command. could u please solve this.
    #hadoop jar splout-hadoop-0.2-hadoop.jar com.splout.db.hadoop.Driver –input /user/hdfs/city.csv –output /user/hdfs –tablespace city_pby_country_code –table city –separator , –escape \\ –quotes \”\”\” –nullstring \\N –schema “id:int,name:string,country_code:string,district:string,population:int” –partitionby country_code –partitions 4

    Exception in thread “main” java.lang.NoSuchMethodError: com.splout.db.hadoop.Driver.driver([Ljava/lang/String;)V
    at com.splout.db.hadoop.Driver.main(Driver.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)

  3. Pere says:

    Hello bindu,

    What version of Hadoop are you using? There is some incompatibility between Driver classes in Hadoop 1.0 or 2.0. Can you try with latest Splout release 0.2.3: http://search.maven.org/#browse%7C-1223220190 ? We solved that issue there. Use MR1 for Hadoop 1.0 (CDH3, 0.20.X, etc) or 2.0 for Hadoop 2.0.X, CDH4, etc.

  4. Pere says:

    Also, Splout 0.2 is quite old and doesn’t even have native Hive integration.

  5. bindu says:

    Thank you Pere..

    I am using Splout release 0.2.3. But I still have this error
    Exception in thread “main” java.io.IOException: Error opening job jar: splout-distribution-0.2.3-hadoop.jar
    at org.apache.hadoop.util.RunJar.main(RunJar.java:135)
    Caused by: java.io.FileNotFoundException: splout-distribution-0.2.3-hadoop.jar (No such file or directory)
    at java.util.zip.ZipFile.open(Native Method)
    at java.util.zip.ZipFile.(Unknown Source)
    at java.util.zip.ZipFile.(Unknown Source)
    at java.util.jar.JarFile.(Unknown Source)
    at java.util.jar.JarFile.(Unknown Source)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:133)

  6. Hello bindu,

    How did you end up with a -distribution- JAR ? The distribution is a .tar.gz that needs to be decompressed first. After decompressing you will have everything including the runnable JAR.
    You can follow the getting started: http://sploutsql.com/gettingstarted.html

    And tell us if you have any further problems. You can also write to the Splout SQL users list: https://groups.google.com/forum/?fromgroups#!forum/sploutdb-users

  7. Hello Bindu,

    I think you are not in the right folder. You must be in the folder where Splout SQL is installed. In this folder, there must be file with the following name: splout-hadoop-0.2.3-SNAPSHOT-hadoop-mr1.jar or splout-hadoop-0.2.3-SNAPSHOT-hadoop-mr1.jar (depending on the version you are using).

  8. bindu says:

    Hi Pere

    I hve hadoop version hadoop-2.0.0-cdh4.0.1 and I used splout-hadoop-0.2.3-hadoop-mr2.jar and i have this error again. Do u have any clear tutorial for this other than user guide

    13/04/22 22:06:09 INFO mapreduce.Job: Running job: job_1366648491576_0001
    13/04/22 22:06:12 INFO mapreduce.Job: Job job_1366648491576_0001 running in uber mode : false
    13/04/22 22:06:12 INFO mapreduce.Job: map 0% reduce 0%
    13/04/22 22:06:12 INFO mapreduce.Job: Job job_1366648491576_0001 failed with state FAILED due to: Application application_1366648491576_0001 failed 1 times due to AM Container for appattempt_1366648491576_0001_000001 exited with exitCode: 1 due to:
    .Failing this attempt.. Failing the application.
    13/04/22 22:06:12 INFO mapreduce.Job: Counters: 0
    Exception in thread “main” com.splout.db.hadoop.TablespaceGenerator$TablespaceGeneratorException: Error executing generation Job
    at com.splout.db.hadoop.TablespaceGenerator.executeViewGeneration(TablespaceGenerator.java:478)
    at com.splout.db.hadoop.TablespaceGenerator.generateView(TablespaceGenerator.java:141)
    at com.splout.db.hadoop.SimpleGeneratorCMD.run(SimpleGeneratorCMD.java:253)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at com.splout.db.hadoop.SimpleGeneratorCMD.main(SimpleGeneratorCMD.java:261)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at com.datasalt.pangool.PangoolDriver$ProgramDescription.invoke(PangoolDriver.java:55)
    at com.datasalt.pangool.PangoolDriver.driver(PangoolDriver.java:128)
    at com.splout.db.hadoop.Driver.main(Driver.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)

  9. Hello bindu,

    Please report your case with as many details as you can (full execution, commands used, logs, etc) to the user list: https://groups.google.com/forum/?fromgroups#!forum/sploutdb-users

    Many thanks,

Leave a Comment