« Tips for Optimizing Cascading Flows | Main
Monday
Dec282009

The mathematics behind Hadoop-based systems

I wish I had known this a year ago. Now, with some simple mathematics I can finally answer:

  • Why doesn't the speed of my workflow double when I double the amount of processing power?
  • Why does a 10% failure rate cause my runtime to go up by 300%?
  • How does optimizing out 30% of my workflow runtime cause the runtime to decrease by 80%?
  • How many machines should I have in my cluster to be adequately performant and fault-tolerant?

All of these questions are neatly answered by one simple equation:

Runtime = Overhead / (1 - {Time to process one hour of data})

We will derive this equation in a moment. First, let's briefly discuss what I mean by "Hadoop-based system."1 A common use-case of Hadoop is running a workflow that processes a continuous stream of incoming data. The workflow runs in a "while(true)" loop, and each iteration of the workflow processes the data that accumulated since last iteration.

The inspiration for the following analysis can be summarized in a simple example. Let's say you have a workflow that runs in 12 hours, and so therefore it processes 12 hours of data each iteration. Now let's say you enhance the workflow to do some additional analysis, and you estimate the analysis will add two hours to the processing time of your current workflow. Now here's the rub. Your workflow runtime may increase by a lot more than two hours. It may increase by ten hours, one hundred hours, or the workflow might spiral out of control and get longer and longer each iteration ad infinitum.

Why?

The issue is that you increased the runtime of a workflow that operated on 12 hours of data to 14 hours. That means the next time the workflow runs, there will be 14 hours of data to process. Since the next iteration has more data, it will take longer to run. Which means the next iteration will have even more data, and so on.

To determine if and when the runtime stabilizes, let's do some simple math. First, let's write an equation for the runtime of a single iteration of a workflow:

Runtime = Overhead + {Time to process one hour of data} * {Hours of data}

Overhead refers to any constant time spent in the workflow. For example, the time it takes to start a job counts as overhead. The time to distribute files using distributed cache counts as overhead. Any time spent that is independent of the amount of data you have will fall under this "overhead" category.

"Time to process one hour of data" refers to the dynamic time in the workflow. This is the amount of time you spend processing the actual data, ignoring the constant time overhead. If every hour of data adds half an hour to your runtime, this value will be 0.5. If every hour of data adds two hours to your runtime, this value will be 2.

For conciseness, let's rewrite the above equation using variables:

T = O + P * H

To determine the stable runtime of the workflow, we need to find out the point at which the runtime of the workflow is equal to the number of hours of data it processes. To do this, we simply plug in T=H and solve for T:

T = O + P * T
T = O / (1 - P)

This is the equation I showed earlier. As you can see, the stable runtime of a workflow is linearly proportional to the amount of overhead in the workflow. So if you're able to decrease overhead by 25%, your workflow runtime will also decrease by 25%. However, the stable runtime of a workflow is non-linearly proportional with the dynamic processing rate "P". One implication of this is that there are diminishing returns on performance gains with each machine added to the cluster.

With this equation we can answer all the questions I posed at the beginning of this article. Let's go through them one at a time:

Why doesn't the speed of my workflow double when I double the amount of processing power?

Doubling the number of machines will decrease your "time to process one hour of data" (P) by 50%.2 The effect this has on your runtime is completely dependent on your former value of P. Let's show this mathematically. Let's call the runtime of your workflow before doubling the cluster size "T1", and the runtime of the workflow after doubling the cluster size "T2". This gives us the two equations:

T1 = O / (1 - P)
T2 = O / (1 - P/2)

The speedup will be given by T2 / T1, the ratio of the new runtime to the old runtime. This gives us:

T2 / T1 = (1 - P) / (1 - P/2)
T2 / T1 = 1 -  P / (2 - P)

Plotting this, we get the following graph, with the original P on the x-axis and the speedup on the y-axis3:

This graph says it all. If your P was really high, say 54 minutes of dynamic time spent per hour of data, then doubling the cluster size will cause the new runtime to be 18% of the original runtime, a speedup of 82%! This is a very counter-intuitive result - I highly recommend to the reader to carefully think about the mechanics of how this happens.

Then again, if your former P wasn't that high (i.e., 6 minutes of dynamic time spent per hour of data), then doubling the cluster size will barely affect the runtime - only by about 6% or so. This makes sense since the runtime is dominated by overhead, so dynamic time accounts for very little of the runtime.

Why does a 10% failure rate cause my runtime to go up by 300%?

This question addresses the property of workflow stability. In a large cluster, you'll always have various kinds of machine failures, so it's important that spikes in the failure rate don't kill performance of mission critical systems. The analysis for this question will look very similar to the last question, except this time we're worsening the dynamic processing rate rather than improving it. A 10% task failure rate means than we'll need to execute about 11% more tasks to get our data processed4. Since tasks are dependent on the amount of data we have, this means our "time to process one hour of data" (P) will increase by 11%. Like the last question, let's call T1 the runtime before the failures start happening and T2 the runtime afterwards:

T1 = O / (1 - P)
T2 = O / (1 - 1.11*P)
T2 / T1 = (1 - P) / (1 - 1.11*P)

Plotting this, we get:

As you can see, the effect failures have on the runtime grows dramatically the less "extra capacity" you have in your cluster. So it's extremely important to keep your P low. We can also see from this plot that the higher your P, the more you risk entering a "cycle of doom" with an increase in failure rates5.

How does optimizing out 30% of my workflow runtime cause the runtime to decrease by 80%?

This question is actually what caused me to figure out this model for workflow runtimes. I had a ridiculous bottleneck in a workflow that was causing about 10 hours of overhead6, and the total runtime of the workflow was 30 hours. After I optimized out that bottleneck (which comprised 30% of the runtime), the runtime dropped like a rock until it stabilized at 6 hours (an 80% reduction). Using our model, we can determine why this happened:

30 = O / (1 - P)
6 = (O - 10) / (1 - P)
O = 12.5, P = 0.58

So those 10 hours accounted for 80% of the overhead in the workflow, which explains the speedup.

How many machines should I have in my cluster to be adequately performant and fault-tolerant?

This is very much an exercise in cost-benefit analysis. We've seen that increasing the size of the cluster to improve performance has diminishing returns, and once P ("time to process one hour of data") drops below 30 minutes (0.5), performance will improve sub-linearly with added machines7. We've also seen how important it is to keep P low, or else an increase in failures or other jobs utilizing the cluster can severely impact your runtimes. From there, you'll just have to run the numbers to determine the optimal number of machines for your application8.

You should follow me on Twitter here.

UPDATE: Check out the follow-up to this post.

1 Actually this equation applies to any system that processes a continuous stream of data in batches.

2 This assumes your processing is fully distributed and that there are no central points of contention, which is generally true in a Hadoop-based workflow. One case where this won't hold is if all your tasks communicate with a central database. Actually, adding more machines will have small adverse effects on both overhead (more machines to deal with) and data processing rate (mappers need to split the data up to more reducers). For the purposes of this analysis we can ignore this.

3 Plotted with FooPlot.

4 Let's say you used to process the data in 100 tasks. Now when you run those tasks, 10 of them will fail. When you re-run those tasks, 9 will succeed and one will fail. Hence, 111 tasks instead of 100 tasks, an 11% increase in tasks.

5 Or an increase in other stuff running on the cluster, like one-off queries or other workflows.

6 It had to do with Berkeley DB causing extreme fragmentation on ext3 filesystems.

7 I actually didn't show this, but you can derive this using the model. I'll leave that as an exercise to the reader.

8 If you wanted, you could make the model much more comprehensive. For example, this model doesn't distinguish between new incoming data and existing data that may need to be queried (and grows every iteration). This variable will certainly affect your long term scaling needs and is important to keep in mind. For the purposes of gaining an intuition about the performance of these workflows, though, it's unnecessary.

Reader Comments (16)

I think the equation is so called Amdahl's Law, right?

December 28, 2009 | Unregistered CommenterForest

@Forest: Amdahl's Law is about how much you can speed up a single run of a program with parallelism. My equation models the stable runtime of an iterative workflow. The equations do look similar, although the variables mean different things.

December 28, 2009 | Unregistered CommenterNathan Marz

[...] This post was mentioned on Twitter by Andrew McCall, Nathan Marz. Nathan Marz said: The mathematics of Hadoop-based systems: http://nathanmarz.com/blog/hadoop-mathematics/ [...]

[...] This post was Twitted by andrewmccall [...]

December 29, 2009 | Unregistered CommenterTwitted by andrewmccall

Social comments and analytics for this post...

This post was mentioned on Twitter by nathanmarz: The mathematics of Hadoop-based systems: http://nathanmarz.com/blog/hadoop-mathematics/...

December 29, 2009 | Unregistered CommenteruberVU - social comments

[...] This post was Twitted by nosqlupdate [...]

January 3, 2010 | Unregistered CommenterTwitted by nosqlupdate

[...] The mathematics behind Hadoop-based systems — thoughts from the red planetnathanmarz.com [...]

[...] The mathematics behind Hadoop-based systems — thoughts from the red planet. [...]

[...] This post was Twitted by hkokko [...]

January 3, 2010 | Unregistered CommenterTwitted by hkokko

[...] This post was Twitted by fogonwater [...]

January 3, 2010 | Unregistered CommenterTwitted by fogonwater

[...] said the following, "I'm just wonde... 2 Likes vho.nasa.gov 2 Likes The mathematics behind Hadoop-based systems — thoughts from the red planet 2 Likes TruePower UCS Power Outlet With Built in USB Ports - TruePower UCS Power [...]

[...] enjoyed “The Mathematics behind Hadoop-Based Systems.” The goslings love Hadoop, a Google-blessed approach to stream processing. But this write up is [...]

[...] Interesting Read on Hadoop Math January 20th, 2010 by Scott Francis Ok ok, what could possibly be interesting about Hadoop-based systems and Mathematics? [...]

[...] The mathematics behind Hadoop-based systems — thoughts from the red planet "Runtime = Overhead / (1 – {Time to process one hour of data})" (tags: algorithms mathematics distributed distributedcomputing hadoop) [...]

February 1, 2010 | Unregistered Commenterlinks for 2010-02-01 • B

[...] a previous post, I developed an equation modeling the stable runtime of an iterative, batch-oriented workflow. We [...]

Free samples...

Here you can Found Free Trial Products,Free Samples,Free Gift Cards information....

PostPost a New Comment

Enter your information below to add a new comment.
Author Email (optional):
Author URL (optional):
Post:
 
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>