In this tutorial for Cascalog, we are going to create part of the back-end for a simplified version of a Facebook-like news feed. In doing so we are going to walk through an end-to-end example of running Cascalog on a production cluster. If you're new to Cascalog, you should first look at the introductory tutorials here and here.
The code and sample data for the example presented in this tutorial can be found on Github.
A news feed ranks events happening in your social network. Our program will take as input two sources of data. The first is "follows" relationships which are stored in text format:
Follows relationships are 2-tuples of (username, username) with fields separated by whitespace.
Our second source of data is "action" data which is also stored in text format:
nathan status=good 1273094927000
nathan birthday 1273026922000
david newjob 1273096922000
david travelling 1273094927000
bob status=tired 1273026922000
dan basketball-captain 1273029922000
Actions are 3-tuples of (username, action, time) separated by whitespace.
To rank the actions in someone's network, we will use a simple scoring function that produces a higher score for people with more followers and a lower score for an older action.
Our query will produce a list of the five highest scored events in each person's network, like so:
mike (("alice" "status=good" 1273091927000) ("alice" "rock-climbing" 1273084927000) ("bob" "status=tired" 1273026922000) ("bob" "engaged" 1272029922000))
jai (("danielle" "married" 1273095924348) ("danielle" "cooking-a-storm" 1273094927000) ("mike" "tennis" 1273084927000) ("mike" "hacking" 1273084827000))
david (("nathan" "status=great" 1273096922000) ("nathan" "status=good" 1273094927000) ("vijay" "inventing" 1273099927000) ("vijay" "dancing" 1273094927000) ("alice" "status=good" 1273091927000))
Parsing the input files
Let's start by writing subqueries to read and parse our data sources. Let's start with "follows":
(defn follows-data [dir]
(let [source (hfs-textline dir)]
(<- [?p ?p2] (source ?line) (re-parse [#"[^\s]+"] ?line :> ?p ?p2)
"follows-data" is a function that takes the path to a directory as input and returns a subquery that emits the parsed tuples. First, the function binds the variable "source" to the result of "(hfs-textline dir)". "hfs-textline" returns a Tap that reads text data from HDFS and emits each line of text as a 1-tuple.
Next, we define our subquery that will parse the line of text into its constituent fields. The subquery reads "?line" from "source" and then parses ?line into fields using the "re-parse" function. "re-parse" is a parameterized function that takes in a regular expression as an argument. In this case, we match on any continuous string of non-whitespace with the regular expression #"[^\s]+".
Finally, we use "(:distinct false)" to tell the query not to eliminate duplicates from the results. In this case eliminating duplicates is unnecessary so we can save a reduce step by eliminating it from the computation.
Next, lets write the subquery that will read and parse the action data:
(defn to-long [num] (Long/parseLong num))
(defn action-data [dir]
(let [source (hfs-textline dir)]
(<- [?p ?a ?t] (source ?line) (re-parse [#"[^\s]+"] ?line :> ?p ?a ?time-str)
(to-long ?time-str :> ?t) (:distinct false))))
This function is very similar to the "follows-data" function, except it produces three fields and not two. We also define a custom function "to-long" in order to convert the timestamp from its string representation to the proper type.
"action-data" and "follows-data" are similar in that they both parse a text file into fields. Let's factor out the parsing logic and simplify the code by making a helper function "textline-parsed" that will take in arguments [dir num-fields] and parse the text files at "dir" into "num-fields" fields:
(defn textline-parsed [dir num-fields]
(let [outargs (v/gen-nullable-vars num-fields)
source (hfs-textline dir)]
(<- outargs (source ?line) (c/re-parse [#"[^\s]+"] ?line :>> outargs)
"textline-parsed" is our first example of a dynamic query and requires some explanation. Since the output fields of the subquery are dynamic according to "num-fields", we have to generate those variable names dynamically. "cascalog.vars", here referred as "v", provides a function "gen-nullable-vars" for producing a vector of some number of unique variables. Variables can be represented as strings in Cascalog as long as they are prefixed with a "?" or "!". We generate a vector of variables using "gen-nullable-vars" and bind the vector to the local "outargs".
Now that we have generated our variables, we just need to use them within the query. Using them as output fields is easy - we just put the generated vector where we normally specify our output fields. To use the generated variables as the output of the re-parse function, we make use of the ":>>" keyword. The standard ":>" keyword requires variables to be spliced into the predicate, but in this case we want to provide a vector of variables without splicing them. ":>>" allows you to do exactly this.
Using our new "textline-parsed" helper function, we can rewrite "follows-data" and "action-data" like so:
(defn follows-data [dir] (textline-parsed dir 2))
(defn action-data [dir]
(let [source (textline-parsed dir 3)]
(<- [?person ?action ?time] (source ?person ?action ?time-str)
(to-long ?time-str :> ?time) (:distinct false))))
All in all, the parsing logic comes out to 12 lines of code.
News feed query
Next is the logic for the query that will produce the news feed. We start with a custom aggregator to produce a JSON-like string out of the ranked actions from someone's network:
(defbufferop mk-feed [tuples]
[(pr-str (take 5 tuples))])
"defbufferop" defines a "buffer aggregator". A buffer takes in all the tuples for a group as one argument, and returns a sequence of tuples as results. In this case, our query will have scored and sorted the tuples beforehand, so we simply take the first five tuples and serialize them using "pr-str".
Next, we define our scoring function for a particular action:
(defn action-score [now-ms folls time-ms]
(let [days-delta (div (- now-ms time-ms) 86400000)]
(div folls (+ days-delta 1))))
"action-score" takes in as input the current time, the number of followers of the person responsible for the action, and the time of the action.
Finally, we have the function that builds and runs our news feed query:
(defn compute-news-feed [output-tap follows-dir action-dir]
(let [follows (follows-data follows-dir)
action (action-data action-dir)
follower-count (<- [?p ?c] (follows ?p2 ?p) (c/count ?c))]
(?<- output-tap [?p ?feed] (follows ?p ?p2) (action ?p2 ?action ?time)
(follower-count ?p2 ?folls)
(action-score (System/currentTimeMillis) ?folls ?time :> ?score)
(:sort ?score) (:reverse true)
(mk-feed ?p2 ?action ?time :> ?feed))))
In our let binding, we use our "follows-data" and "action-data" functions to get a hold of the parsed tuples. We then define a subquery "follower-count" to give us the number of followers for each person.
In the news feed query, for each person ?p2 followed by ?p, we retrieve ?p2's follower count and actions. We then run our "action-score" function to produce a score for each action. Finally, we use the predicates "(:sort ?score) (:reverse true)" so that the "mk-feed" aggregator receives the actions in order of most interesting to least interesting. The news feed query comprises 15 lines of code.
Running on a production cluster
The cascalog-demo project has sample data in the "data/" subdirectory. Copy the sample data onto your cluster to "/tmp/follows" and "/tmp/action".
Next, run "lein uberjar" to create a jar containing the program with all its dependencies. Since the demo code specifies :gen-class and has a main method, we can run it just like any other hadoop program. To run the query on a cluster and output the results in text format to "/tmp/results", run:
hadoop jar cascalog-demo-standalone.jar cascalog_demo.demo /tmp/follows /tmp/action /tmp/results
Since the uberjar has Clojure within it, you can also run the code from the REPL with the following commands:
hadoop jar cascalog-demo-standalone.jar clojure.lang.Repl
user=> (use 'cascalog-demo.demo) (use 'cascalog.api)
user=> (compute-news-feed (hfs-textline "/tmp/results") "/tmp/follows" "/tmp/action")
You can replace the output tap of "hfs-textline" with "hfs-seqfile" to output in sequence file format, stdout to print to standard output, or another tap to output in some other format or to some other destination.
All custom operations must be compiled into the uberjar in order to use them on a cluster, which means you won't be able to use custom operations defined in the REPL in production queries. However, this restriction goes away when Hadoop is run in local mode, which means the REPL is still great for development of custom operations.
It's worth noting that there are a number of optimizations that can be done to speed up the news feed query. The join between the "follows" and "action" data is a very large join and much can be done to minimize the blowup in tuples. For starters, very old actions can be filtered outright before the join. Other, more complex optimizations are also possible.
With the material from this tutorial, you should now have the ability to write and run non-trivial queries in a production environment. You can download the code for this tutorial from Github. Please leave any questions in the Cascalog google group or in the comments below.
You should follow me on Twitter here.