« Mimi Silbert: the greatest hacker in the world | Main | The mathematics behind Hadoop-based systems »

Tips for Optimizing Cascading Flows

Here's a few tips for optimizing your Cascading flows. I also recommend checking out "7 Tips for Improving MapReduce Performance" for general MapReduce optimization tips.

1. Filter data as early as possible

Less data equals less work. Sometimes you can easily filter data out early by moving filters earlier in your flow. Other times, it can be more complicated.

One important use case that can be heavily optimized is querying a set of records for matches against a large set of keys - also known as a batch query. You can read more about the mechanics of batch queries here. You can find the source code for batch querying here. The implementation utilizes a bloom filter to filter data out of the flow before using a join to complete the logic for the query.

Another wide-range of use cases involves keeping data in memory within each task for querying. For example, let's say you have a bunch of key-value pairs in HDFS, and you want all the pairs where the key does not belong in a separate list of 30,000 keys. The naive way to do this would be to do a left join, and then keep all key-value pairs that joined to null. Unfortunately this requires funneling all the data through a reducer.

Since the key set we're joining against is so small, we can try just putting the entire key-set in memory and do a map-only job. This is a little tricky. Your first attempt may be to pass the keys into the constructor of a custom function and store the keys as as instance variable, like so:

pipe = new Each(pipe, new MyCustomFilter(keys));

You'll be in for a rude surprise though. Under the hoods, Cascading serializes all that data into the JobConf. Hadoop does not play well with big JobConfs, and this may bring down your JobTracker.

What you want to do is read the keys in dynamically for every task. To do this, use Hadoop's distributed cache to distribute the keys to each machine, and then use JVM reuse to minimize the number of times the keys get read into memory. Here's a template for how to do that:

//custom filter
protected static class MyFilter extends BaseOperation implements Filter {
//the set needs to be static to take advantage of JVM reuse
protected static Set _keys = null;

public boolean isRemove(FlowProcess process, FilterCall call) {
if (_keys == null) {
Path[] files = DistributedCache.getLocalCacheFiles(
throw new RuntimeException("Expected one path " +
"in the Distributed cache: there were " + files.length);
_keys = loadKeys(FileSystem.getLocal(new Configuration()),
//flow setup
Map properties = new HashMap();
properties.put("mapred.job.reuse.jvm.num.tasks", -1);
properties.put("mapred.cache.files", pathToKeysOnHdfs);

Flow flow = new FlowConnector(properties).connect(...);

2. Eliminate reduces by using MultiGroupBy

Every GroupBy or CoGroup in your flow will cause another round of reducing. Sometimes you can combine multiple GroupBy's and CoGroup's together into one reduce which can lead to huge performance gains.

To combine reduces together, you'll want to make use of "MultiGroupBy", a new pipe type I hacked together and originally wrote about here. The source code is linked on that page as well. MultiGroupBy allows you to do a GroupBy on multiple pipes which differ in their field composition. For example, you may have one pipe containing ("id", "age", "gender"), another pipe containing ("id", "friend_id"), and may want the age, gender, and friends count for each "person_id". With straight Cascading, you would do something like:

Pipe p1; //"id", "age", "gender"
Pipe p2; //"id", "friend_id"

p2 = new GroupBy(p2, new Fields("id"));
p2 = new Every(p2, new Count("count"), new Fields("id", "count"));

Pipe joined = new CoGroup(p1, new Fields("id"), p2, new Fields("id"),
new Fields("id", "age", "gender", "id2", "count"));
joined = new Each(joined, new Fields("id", "age", "gender", "count"),
new Identity());

This requires a GroupBy and a CoGroup which results in two jobs. With MultiGroupBy, you can do the join and the aggregation in a single step - check the link for more details.

Chris Wensel said he might release a natively supported MultiGroupBy for Cascading v1.1.

3. Define serialization tokens for custom writables

Cascading encodes the type of every value when serializing tuples. If you're using custom writables, such as a writable for a Thrift object, the default Cascading encoding will be to write the fully qualified class name followed by the serialized object. For fully qualified names like "org.yours.FooWritable", you're looking at 20 bytes of overhead for every value. To fix this, you can define mappings from an integer "token" to a classname in your JobConf. Classes that have a "token" defined will use the token instead of the classname, saving a lot of overhead.

Token values must be greater than 128 - numbers below are reserved by Cascading. I recommend defining the mappings in your "core-site.xml" file from wherever you launch your jobs (or "hadoop-site.xml" if you're still on Hadoop 0.18) so that the mappings are picked up for all jobs automatically. Here's an example definition:


4. Dealing with unbalanced joins

This tip will benefit joins that have a small amount of values per key for one side of the join and a large amount of values per key for the other side of the join. You can think of a Cascading join as being a nested for-loop, like:

for(int i=0; i < leftSide.length; i++) {
for(int j=0; j < rightSide.length; j++) {
Tuple t = leftSide[i] + rightSide[j]; //pseudocode

Cascading stores each side of the join in a "SpillableTupleList". A "SpillableTupleList" keeps tuples in memory up to a certain amount and spills to disk once the amount of tuples gets above a certain threshold. In the nested for-loop, the right side of the join gets iterated over once for every element on the left side. When the left side is small and the right side is large this is hugely inefficient, because you end up doing disk operations over and over to access the tuples from the right side of the join. It's much more efficient to keep the set of tuples that fits in memory on the inside of the for-loop.

Doing this in code is really simple - just put the smaller part of your join on the right side of your CoGroup, like so:

joined = new CoGroup(largeSide, new Fields("key"), 
smallSide, new Fields("key"), ...);

There's a few symptoms to look out for when determining whether this tip will help. First, if you find that your CoGroup job has one reducer taking way longer than the rest, then you may have one key which is particularly imbalanced. That key is a huge outlier in terms of being unbalanced and causes tons of spilling which causes one task to take much longer. The second symptom to look for are lots of log messages in the reducer task logs of the form "spilling tuple list to file number X" - an indication that at least one side of the join has lots of values.

You should follow me on Twitter here.

Reader Comments (3)

[...] This post was Twitted by nosqlupdate [...]

January 3, 2010 | Unregistered CommenterTwitted by nosqlupdate

Social comments and analytics for this post...

This post was mentioned on Twitter by nathanmarz: Blogged some Cascading optimization tips: http://bit.ly/4NilPD...

January 3, 2010 | Unregistered CommenteruberVU - social comments

Regarding the tip for custom serialization tokens, it's also worth noting that the cpu time consumed when you don't have them goes through the roof. It's because a fair amount of time gets spent deserializing and converting the string name, as well as mapping into some internal structures to get the real class back out.

Bottom line, use those tokens!

January 4, 2010 | Unregistered Commenterbryanduxbury

PostPost a New Comment

Enter your information below to add a new comment.
Author Email (optional):
Author URL (optional):
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>