Dealing with skewed key sizes in Cascading

Rapleaf indexes data from a wide variety of sources and across all different sorts of people. As a result, some of the people we analyze end up having a lot more data about them stored in our systems than others. For instance, your average person has around a hundred friends, whereas Ashton Kutcher’s Twitter account has millions.

This variance in the size of a “person” in our system has led us to experience some interesting problems when we do things like CoGroup or GroupBy in such a way that all of a person’s data is reduced together. In these cases, sometimes we end up with massively skewed task sizes and runtimes. Since there are generally only a few of these people with millions and millions of datapoints, instead of all reducers taking pretty much the same amount of time, a few will take much, much longer than the rest. This leads to performance degradation and, at times, outright job failure when an overlarge task becomes unable to report in.

The intuitive way to solve this problem is to decrease the amount of data a given reducer will be responsible to process. However, doing so is not as easy as just increasing the number of reducers. Since the problem is not the aggregate amount of data that goes to an individual reducer, but the sheer number of values that just a few keys have, more reducers would just make the successful tasks smaller and the unsuccessful tasks more tightly focused on the few bad keys.

Instead, what you need to do is actually spread the data over more keys so that it will be distributed amongst reducers more evenly. The way that we have accomplished this is by doing what I’ve been thinking of as a “shuffle join”. Basically, you take your natural key and add some additional variation to it in order to form a synthetic key, and then do your joining or grouping on the synthetic key.

Let’s look at how to do this for the CoGroup use case in detail. First, pick some shuffle factor, which we’ll call N. This number should be chosen such that you are looking to send at most 1/N of the data for a given natural key to a single reducer. Next, you form your synthetic keys in all of your pipes. I think of my pipes as being of two different types: keys and values. Every value must find its key, but the keys don’t need to see every value. To form synthetic keys for your key pipe, emit a new tuple with the natural key plus each of the numbers between 0 and N-1. To form synthetic keys in the values pipe, emit the tuple with its natural key plus a random number between 0 and N-1. (Make sure you are using a deterministic means of selecting random numbers as discussed here.) Finally, perform your CoGroup on these pipes, specifying the join fields to be the fields that compose the synthetic key.

The result of this extra setup is that 1/N of your “values” will get each of the numbers 0 to N-1, and all of the keys will be “copied” with the additional 0 to N-1 field. When it goes into the join, each of the synthetic keys will get no more than 1/N of the values for the natural key. For most reasonable values of N, this will cause your outlier keys to be split into manageable chunks, and your reduce tasks will become homogeneous again, giving you back consistent runtimes. After the CoGroup pipe, you’re free to strip the extra field off your tuple to get the get back your original tuple and go about your business.

This same strategy can be used to shuffle up GroupBy pipes as well, and more easily at that. Instead of worrying about key and value pipes, just treat everything like a value pipe and stick in the random number. Each key should be seen about N times in N different reducers.

The unfortunate downside of this strategy is that it’s not suitable for every possible use case, since any Every pipe downstream won’t get all the values for the natural key. If you are planning to do some aggregation, you need to consider if it can be done piecemeal or not. For instance, if you are just counting the records for each key, you can emit the value you get as a sort of subtotal and just put all the subtotals together after the fact, either with a little Java code or another set of pipes. (After all, the output of this step should be much smaller now, and is likely totally manageable.)

This entry was posted in Cascading, MapReduce. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.

2 Comments

  1. Posted March 8, 2010 at 10:21 pm | Permalink

    Yup, we ran into this same exact issue. We came up with a different solution for our case though because we needed to do aggreagations: we actually wrote a custom hashing function that ensured a more even distribution of keys and then joined on those without loss of precision in our aggregates. (Hard to explain in a blog comment.)

    The reason I’m replying though is that another boon for cascading performance was a simple tweak of setting the cascading.cogroup.spill.threshold to a higher value than it’s default 10000. (Bryan, I know you’ve tweaked this as per your reply on usenet, but thought I’d echo it again here :) )

    This trick makes it so even if you do have some skew in your keys you can at least avoid having those extra-hard-working reducers spill to disk and grind to a halt. For example, if you can get your hashing routine such that you know that your biggest key is 25,000 records, this will get spilled to disk and slow things down dramatically, even though it could in theory fit in RAM. (Depending on the structure of your tuples.) The trick is to adjust this threshold high enough so you stay in RAM — beware though, if you set it too high, there’s nothing stopping Cascading from exceeding the heap and starting to throw OutOfMemoryExceptions. So, you need to have a pretty good understanding of your data to use this wisely — in our case, using elastic map reduce, we turned a $1800 job to an $80 by just applying this once we had a solid enough hashing routine we could reason about an upper bound on the key skew. (The reason for the big jump is because the bottleneck quickly shifted from CPU to I/O, and on EC2, that means you can move away form c1.xlarges down to c1.mediums without much throughput loss, a big cost savings.)

  2. Posted March 11, 2010 at 10:14 am | Permalink

    Btw, in 1.1, the left most side will never spill to disk. So if you are seeing spills when joining two streams, swap the order they are handed to the CoGroup.

    ckw

Post a Comment

Your email is never published nor shared. Required fields are marked *

*
*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>