Thrift and pseduo-RDF schemas

BackType’s Nathan Marz recently wrote a really great post about using Thrift and an RDF-like schema to get type-safe, extensible, high-performance schemas for use in Hadoop environments. He really hit the nail on the head describing the use pattern and the positives and negatives.

A variation on this approach is something we’ve been doing at Rapleaf for a few years now. Our heavy use of Thrift, including for this style of data storage, is what lead us to contribute patches for union support, amongst many other features. I’m really glad to see that others are making as much use of these features as we are.

One thing he’s doing that we’re not is treating different types of “relations” as first-class separate data types. Instead, we have a single DataUnit class with all the possible attributes on it. This means that the information we can store about different node types isn’t as constrained. This has been suitable for us up to this point because all of our data is about individuals. I could certainly see how, as we add different kinds of nodes (like Facebook Groups or corporations) into our system, using this strategy would be a big consistency improvement.

Additionally, our data model includes an extra field of general metadata we call Pedigree. This stores the when, where, and how elements of the data we collect. It’s not a significant extension to the data model Nathan describes, but it’s an important one.

All in all, this is a great step forward. Hope to see more of this kind of stuff out of you, Nathan!

Posted in Hadoop, Thrift | 1 Comment

Accelerate your test suite with Cascading 1.1

One big downside of using Cascading for our applications has been the runtime of our regression test suite. We test with quantities of data nowhere near our regular production volume, but we still end up running lots of jobs. In our experience, this ends up making our tests take a long time (in the tens of minutes), killing our ability to iterate quickly.

After looking more deeply into the issue, we discovered that it all came down to one particular polling interval. When Cascading launches a Flow, it will launch one or more actual Hadoop jobs and then wait until each completes before launching the next in the pipeline. The problem is that the amount of time Cascading waits between checking job completion is set to something like 5 seconds by default! This setting makes plenty of sense in the case of real world jobs, which should all be at least minutes in length – 5 seconds isn’t going to make a difference one way or another. However, in our ultra-short job scenario, this makes all the difference. If your Flow works out to 10 jobs that run serially, the fastest it could complete is 50 seconds.

Initially, we customized Cascading 1.0.8 to reduce this wait time down to about 100 milliseconds. However, when we recently upgraded to Cascading 1.1, we were pleasantly surprised to find that this polling interval was now configurable. Generally, it looks something like this:

Map<Object, Object> properties = new HashMap<Object, Object>();
properties.put("cascading.flow.job.pollinginterval", 100);
new FlowConnector(properties).connect(...).complete();

With a convenient way to change this parameter, the only other thing we need is a convenient way to set this value environmentally. Ideally, we’d like to leave the parameter alone during production runs and only set it low during our test suite. This is a little tricky because, unlike Hadoop, Cascading itself doesn’t provide any global configuration framework.

The solution we ended up going with was to provide a class with a static method for getting new FlowConnectors that replaces the standard constructor. This method allows the user to provide any options they need and merges in whatever the current environmental polling interval should be. It looks something like this:

public final class CascadingHelper {
  public static final Map<Object, Object> DEFAULT_PROPERTIES = new HashMap<Object,Object>();

  private CascadingHelper() {}

  public static FlowConnector getFlowConnector() {
    return new FlowConnector(DEFAULT_PROPERTIES);
  }

  public static FlowConnector getFlowConnector(Map<Object, Object> properties) {
    Map<Object, Object> combined = new HashMap<Object, Object>();

    for (Map.Entry<Object,Object> entry : DEFAULT_PROPERTIES.entrySet()) {
      combined.put(entry.getKey(), entry.getValue());
    }

    for (Map.Entry<Object,Object> entry : properties.entrySet()) {
      combined.put(entry.getKey(), entry.getValue());
    }

    return new FlowConnector(combined);
  }
}

Finally, in our test suite, we made all of our tests inherit from a common base test class that reconfigures the default polling interval in the class constructor. Voila, our test suite takes 40% less time!

The only downside we’re currently presented with is that if a user forgets to use either the FlowConnector provider or the test base class, then the Flows in the tests run slowly, so constant vigilance is required. Still, making this change has caused our build to run somewhere between 2x and 3x as fast, which is just plain awesome.

Posted in Cascading | Tagged , | Leave a comment

Dealing with skewed key sizes in Cascading

Rapleaf indexes data from a wide variety of sources and across all different sorts of people. As a result, some of the people we analyze end up having a lot more data about them stored in our systems than others. For instance, your average person has around a hundred friends, whereas Ashton Kutcher’s Twitter account has millions.

This variance in the size of a “person” in our system has led us to experience some interesting problems when we do things like CoGroup or GroupBy in such a way that all of a person’s data is reduced together. In these cases, sometimes we end up with massively skewed task sizes and runtimes. Since there are generally only a few of these people with millions and millions of datapoints, instead of all reducers taking pretty much the same amount of time, a few will take much, much longer than the rest. This leads to performance degradation and, at times, outright job failure when an overlarge task becomes unable to report in.

The intuitive way to solve this problem is to decrease the amount of data a given reducer will be responsible to process. However, doing so is not as easy as just increasing the number of reducers. Since the problem is not the aggregate amount of data that goes to an individual reducer, but the sheer number of values that just a few keys have, more reducers would just make the successful tasks smaller and the unsuccessful tasks more tightly focused on the few bad keys.

Instead, what you need to do is actually spread the data over more keys so that it will be distributed amongst reducers more evenly. The way that we have accomplished this is by doing what I’ve been thinking of as a “shuffle join”. Basically, you take your natural key and add some additional variation to it in order to form a synthetic key, and then do your joining or grouping on the synthetic key.

Let’s look at how to do this for the CoGroup use case in detail. First, pick some shuffle factor, which we’ll call N. This number should be chosen such that you are looking to send at most 1/N of the data for a given natural key to a single reducer. Next, you form your synthetic keys in all of your pipes. I think of my pipes as being of two different types: keys and values. Every value must find its key, but the keys don’t need to see every value. To form synthetic keys for your key pipe, emit a new tuple with the natural key plus each of the numbers between 0 and N-1. To form synthetic keys in the values pipe, emit the tuple with its natural key plus a random number between 0 and N-1. (Make sure you are using a deterministic means of selecting random numbers as discussed here.) Finally, perform your CoGroup on these pipes, specifying the join fields to be the fields that compose the synthetic key.

The result of this extra setup is that 1/N of your “values” will get each of the numbers 0 to N-1, and all of the keys will be “copied” with the additional 0 to N-1 field. When it goes into the join, each of the synthetic keys will get no more than 1/N of the values for the natural key. For most reasonable values of N, this will cause your outlier keys to be split into manageable chunks, and your reduce tasks will become homogeneous again, giving you back consistent runtimes. After the CoGroup pipe, you’re free to strip the extra field off your tuple to get the get back your original tuple and go about your business.

This same strategy can be used to shuffle up GroupBy pipes as well, and more easily at that. Instead of worrying about key and value pipes, just treat everything like a value pipe and stick in the random number. Each key should be seen about N times in N different reducers.

The unfortunate downside of this strategy is that it’s not suitable for every possible use case, since any Every pipe downstream won’t get all the values for the natural key. If you are planning to do some aggregation, you need to consider if it can be done piecemeal or not. For instance, if you are just counting the records for each key, you can emit the value you get as a sort of subtotal and just put all the subtotals together after the fact, either with a little Java code or another set of pipes. (After all, the output of this step should be much smaller now, and is likely totally manageable.)

Posted in Cascading, MapReduce | 2 Comments

Master Kozikowski

Piotr Kozikowski was one of our star interns during the summer of 2008 and we are excited to welcome him back as a full-time employee starting next week. Piotr recently published his Master’s thesis on data inference, in conjunction with Rapleaf. Here is the abstract of his very well-written paper. Congratulations Piotr!

Although social networking websites have existed for well over a decade, it is only recently (since approx. 2005) that they have become immensely popular. Today, social networking websites play a very important role in the lives of millions of people.

As the user base of social networking sites grows at a rapid pace, so do the opportunities to take advantage of the vast amount of data that becomes available as a result.

The purpose of this thesis is to use data gathered from social networking sites to develop techniques to infer unknown attributes of individuals based on other, known attributes, as well as attributes of people with whom the individuals have a relationship.

The main focus of the thesis is the inference of gender, age, and location. Other attributes are also analyzed, but in much less detail. Additionally, the possibility of assigning weights to the relations among individuals to provide a measure of friendship closeness is also explored.

The data analyzed does not come from one specific social networking website, but rather is composed of publicly available information coming from a number of such websites. It is hoped that this data is representative of the social web in general and, therefore, any findings will also be applicable to the social web in general.

Posted in Miscellaneous | 1 Comment

Keeping It Clean

Like most engineering teams, we at Rapleaf do not write perfect code. Whether because of design shortsightedness, changing requirements, poor style, obsoleted functionality or a host of other factors, we often find ourselves wanting to clean up parts of our system. The need to clean up code is common to every individual and development organization, and lots has been written about the costs of technical debt and tips for effective refactoring. Somewhat less discussed are the procedural strategies that organizations employ to effectively encourage code cleanup; a team may know how to refactor, but without an effective mechanism may still find itself paralyzed by too frequent, too infrequent, or unmotivated practices.

One of the core cleanup strategies that we use at Rapleaf revolves around an event we call “Sweepleaf”. Roughly every 6 weeks, we designate a day to be a special Sweepleaf day during which engineers set aside their regular work and focus exclusively on cleanup tasks. These tasks often involve rewriting or eliminating code, decommissioning obsolete services, or other work that yields a healthier codebase. At the end of the day, we all share in the catharsis of reporting how many lines of code we deleted or files we removed.

The key benefits of our Sweepleaf process are twofold. First is the obvious: Our code is much leaner, more readable, and generally healthier than it would be otherwise. In the absence of Sweepleaf, a multitude of cleanup tasks might not get addressed before they become crippling. One way I like to think about Sweepleaf is that it is an opportunity to take care of those tasks that are eternally 2nd priority; they are the tasks that you really, really want to take care of when you run across them in the code, but a higher priority deliverable is always in the way.

The less obvious benefit is psychological. Because of the regularity of Sweepleaf, engineers are aware that they will have a cleanup opportunity in the near future, mitigating the frustration of living with imperfect code. They can immediately record the item in our active list of Sweepleaf tasks (maintained in Jira). Further, the simple knowledge that others are improving code quality is a boon to morale. One engineer on the team expresses his feeling that “there’s this great relief – your life just got easier. You get to bask in the glow of an agile culture. You’re lean and mean, and everyone on your team knows it.”

Sweepleaf on its own does not solve every cleanup issue. Some tasks are too large to be addressed within a day, and some are too urgent to be left alone for 6 weeks. In those cases we’ll spend time where it’s needed and make sure the job is done right, regardless of the day. On the whole, though, Sweepleaf both raises our “cleanup consciousness” and motivates us as a team to keep it clean.

Posted in Miscellaneous | 5 Comments

The Wrath of DrWho, or Unpredictable Hadoop Memory Usage

A while back we encountered a really peculiar problem with one of our Hadoop apps. The app did a bunch of HDFS operations before launching a small Map/Reduce job and going on to do a bunch of memory intensive operations. It would run very happily for a few days to a week at a time, at which point it would suddenly start failing with exceptions that look like this:

org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.security.AccessControlException: Permission denied: user=DrWho, access=EXECUTE, inode="data":rapleaf:supergroup:rwxr-x---

Who is this curious DrWho user, and why is he trying to access our data? A little digging revealed that DrWho was the user that Hadoop picks when it can’t figure out who you’re supposed to be. Not the most helpful username to pick for that scenario, to say the least.

All the info we found on the issue indicated that when Hadoop can’t figure out user/group info, it’s because of some issue with the machine’s configuration, the “whoami” command, or things along those lines. We checked everything we could think of – our LDAP config, NFS mounts, network connectivity, other HDFS commands on the same machine. Nothing ever solved the issue like we expected. The only way we found to make the app start working again was the literally reboot the machine. So Windows 95, isn’t it?

Finally, with the help of Cloudera, we were able to figure it out. In order to call the “whoami” command to get user info, the Hadoop code would shell out, which would fork the entire JVM process for an instant. As previously stated, the app in question was a memory-intensive app which was configured to pre-allocate about 95% of the machine’s physical memory. Most of the time, when the machine was fresh, this would work with no issue. However, after a few days of running, I’m guessing some other things would accumulate in memory, so when the fork call would happen, the OS would fail to spawn the shell command’s process because there wouldn’t be enough memory for it.

Ultimately, we were able to fix this problem by pre-allocating less of the machine’s physical memory at JVM startup – more like 75%. Since then we’ve not had the problem.

However, we have run into a related class of problems. In another memory-intensive application, again near the machine’s physical memory limit, we found that we’d very consistently get OutOfMemoryExceptions when there were a number of concurrent copyToLocal calls running. Again, after some frustration, we were able to track the problem down to a shell command, in this case “chmod”, that was being called at the end of copyToLocal. In this case, we were able to reorganize our code to alleviate the issue.

The bottom line is that Hadoop can have some unpredictable behavior when it makes shell calls, so beware.

Posted in HDFS, Hadoop | 6 Comments

Person API v3

We just updated our person API to allow queries by social network profile ID. The new style URLs are also more natural to write. If you want to query by email:

http://api.rapleaf.com/v3/person/email/dummy@rapleaf.com

If you want to query by the SHA1 hash of an email:

http://api.rapleaf.com/v3/person/hash/sha1/1147e414eec8b785fb760f13f7890a767ffaef6e

If you want to query by Myspace profile ID:

http://api.rapleaf.com/v3/person/web/myspace/1234567

If you want to query by Twitter user:

http://api.rapleaf.com/v3/person/web/twitter/janedoe

An API key is also required. You can check out the rest of the documentation here.

Posted in Miscellaneous | 3 Comments

Cycles of Doom in Batch Processing Workflows

We integrate new data into our databases via a large batch processing workflow. The execution time of this workflow directly affects the time it takes to get new data to our customers, so keeping the runtime small is of paramount importance to us. There’s an interesting effect that can happen which we’ve dubbed the “cycle of doom” which is critical to understand when optimizing these batch processing workflows.

Let’s look at an example. Let’s say we have a workflow which takes 8 hours to run, and so it processes 8 hours of data each iteration. Let’s say you want to add another piece to the workflow which will add 2 hours to the processing time. So that means the workflow runtime will now be 10 hours, with each iteration processing 10 hours of data.

Wrong.

That additional workflow piece will have a secondary effect on the system. The additional workflow piece added 2 hours to process 8 hours of data. This means that next time the workflow runs, there will be 10 hours of data to process. Since the next iteration has more data, it will take longer to run. Which means the next iteration will have even more data, and so on. This is the “cycle of doom.” How long will this spin out of control? How can we determine whether this stabilizes at some point?

Obviously there’s more to this analysis, so let’s take a more formal look at this effect. First, let’s write an equation for how long it takes one iteration of a workflow to complete:

(ITERATION TIME) = OVERHEAD + (TIME TO PROCESS ONE HOUR OF DATA) * (HOURS OF DATA)

Overhead includes things like time to start jobs, loading bloom filters, fixed network latencies, etc. “Time to process one hour of data” means how much time is spent processing the actual data, minus constant factors such as job-startup time. For simplification, let’s use shorter variable names and write this equation as:

T = O + P * H

At some point, the iteration time will stabilize at a constant runtime. We can determine the stabilization point by plugging in T = H, or in English, when the amount of time it takes to run an iteration is equal to the amount of hours of data processed by the iteration. Solving for T, we get:

T = O + P * T
T = O / (1 – P)

There’s a lot of interesting results we can take from this equation. First, if the amount of time it takes to process one hour of data (minus overhead) is more than an hour, then you will hit a cycle of doom and the iteration time will be infinity.

The more interesting result is the effect of overhead. Overhead gets multiplied. So if you have enough cluster capacity to process one hour of data in half an hour, each hour of overhead will actually add 2 hours to your iteration time. If you barely have enough cluster capacity to process one hour of data (let’s say it takes 59 minutes), then each hour of overhead will add 60 hours to your iteration time. As you can see, anything you can do to decrease overhead or decrease the data processing time (like by adding more machines) can have a huge effect on your performance.

Let’s take a look at a real-world example with Rapleaf’s workflow. We recently replaced a piece of our workflow that was writing large amounts of data to a database locally and then copying that database into HDFS. The database was getting really fragmented, so it would take 10 hours to do that copy. We replaced the database with a different system that does streaming writes which got rid of the fragmentation problem. Our workflow runtime went from 30 hours to 6 hours. So, getting rid of 10 hours of overhead had a 5x improvement on iteration time, even though it only accounted for 30% of the original runtime.

Let’s plug the numbers into our equation. We end up with:

30 = O / (1 – P)
6 = (O-10) / (1 – P)

Solving, we get P = 7/12 and O = 12.5. 10 hours of overhead corresponded to 4/5 of our total overhead, which explains the 5x improvement. We also discovered our “data processing rate” to be about half hour for each hour of data.

This equation doesn’t model everything. One factor not modeled in Rapleaf’s workflow is the fact that every iteration adds data to our database, which will slightly raise query times for the next iteration. There’s also variance in the amount of data we receive each hour which changes the analysis somewhat. Even though this equation is just an approximation, we’ve found it to be a fairly close approximation and it has really helped us understand the fundamental factors affecting our iteration times.

Posted in Hadoop, MapReduce | 10 Comments

Command-line auto completion for Hadoop DFS commands

We like to keep things simple here at Rapleaf. One small tweak we made right after we installed hadoop was to alias 'hadoop dfs' to 'hdfs'. It rolls off the fingers nicely. We are also constantly typing 'hdfs -ls this' or 'hdfs -du that'. If we are not sure what this/that is, we type 'hdfs -ls /this/what', then 'hdfs -ls /this/what/ever', followed by a copy and paste or two. Thanks to our recent HackLeaf day and Nathan’s great idea, we no longer have to go through all of that. Just type 'hdfs -ls [tab]' and it works just like bash command-line completion.

This was easy to implement once I found the programmable completion tool by Ian Macdonald. I just added the following section to bash_completion:

# hdfs(1) completion
#
have hadoop &&
_hdfs()
{
  local cur prev

  COMPREPLY=()
  cur=${COMP_WORDS[COMP_CWORD]}
  prev=${COMP_WORDS[COMP_CWORD-1]}

  if [[ "$prev" == hdfs ]]; then
    COMPREPLY=( $( compgen -W '-ls -lsr -du -dus -count -mv -cp -rm \
      -rmr -expunge -put -copyFromLocal -moveToLocal -mkdir -setrep \
      -touchz -test -stat -tail -chmod -chown -chgrp -help'
-- $cur ) )
  fi

  if [[ "$prev" == -ls ]] || [[ "$prev" == -lsr ]] || \
     [[ "$prev" == -du ]] || [[ "$prev" == -dus ]] || \
     [[ "$prev" == -cat ]] || [[ "$prev" == -mkdir ]] || \
     [[ "$prev" == -put ]] || [[ "$prev" == -rm ]] || \
     [[ "$prev" == -rmr ]] || [[ "$prev" == -tail ]] || \
     [[ "$prev" == -cp ]]; then
    if [[ -z "$cur" ]]; then
      COMPREPLY=( $( compgen -W "$( hdfs -ls / 2>-|grep -v ^Found|awk '{print $8}' )" -- "$cur" ) )
    elif [[ `echo $cur | grep \/$` ]]; then
      COMPREPLY=( $( compgen -W "$( hdfs -ls $cur 2>-|grep -v ^Found|awk '{print $8}' )" -- "$cur" ) )
    else
      COMPREPLY=( $( compgen -W "$( hdfs -ls $cur* 2>-|grep -v ^Found|awk '{print $8}' )" -- "$cur" ) )
    fi
  fi
} &&
complete -F _hdfs hdfs

I’m sure there are some ways to make the code more elegant, but it is called HackLeaf, after all. This bit of code builds on top of other functions in the script, but the basic idea is pretty simple. cur contains the current word you are typing, so this would be a partial command or partial path. prev contains the previous word. If the previous word is hdfs, then we present the user with valid arguments to hdfs. If the previous word is -ls (or any other command where you want a path/file), then present the user with the possibilities for that path or partial path. HDFS defaults to the user’s home directory if no path is provided, so we override that by presenting the user with the possibilities under “/”. Finally, COMPREPLY returns the possibilities to the user on the command-line.

Be sure to check out some of the other features of bash_completion, particularly ssh and chkconfig.

Posted in HDFS, Hadoop, bash | Tagged , , , , , , | 4 Comments

Dead Simple MapReduce Workflow Configuration

If you use MapReduce for any real-world application, chances are your workflow consists of more than one MapReduce job. Rapleaf has workflows consisting of over one hundred jobs. A lot of times, you need to make configurations to the workflow that should apply to every job. For example, you may want each job to run in the same fair scheduler pool or use a certain number of reducers.

One way to do this would be to configure each job at the code level. Unfortunately, this can be tedious and error-prone, since if you add a new job to the workflow you need to remember to add the proper configurations. Fortunately, there’s a better way.

Hadoop has a static method called “Configuration.addDefaultResource” that allows you to specify a file to be loaded into the configuration by default. Like Cascading Style Sheets, Hadoop will load the configurations one file at a time, with configurations from later files overriding those from earlier ones. A static initializer in the JobConf class causes Hadoop to load in “mapred-default.xml” and “mapred-site.xml”.

To create an application level configuration, you will want to perform the following steps:

1. Create an “application-site.xml” file and put it in the same directory from which you run your job jar.
2. In the main method of your code, add the lines:

new JobConf(); //ensure mapred-default and mapred-site get loaded in first
Configuration.addDefaultResource("application-site.xml");

3. Ensure that “.” is in your classpath when you run the job jar so that Hadoop can find the “application-site.xml” resource.

A side benefit of this approach is that if you need to tweak some settings in the middle of a workflow, you just need to edit the application-site.xml file and each subsequent job will pick up the new settings.

Posted in Hadoop, MapReduce | Tagged , | Leave a comment