Pseudo-Combiners in Cascading

In order to get maximum performance from MapReduce, you need to minimize the amount of data that you have to transfer around the network.  If nearly your entire input must be transferred from your mappers to your reducers, then you’ll be putting a great deal of stress on your disks and network.  One thing that comes highly recommended is the use of combiners, which allow for part of the reducing to be done during the map phase in cases where you are performing associative and commutative aggregations such as counting, summing, or finding the minimum or maximum.  This is especially true when you have very few group keys, which would force large numbers of tuples to be passed into a small number of reducers.

Unfortunately, while MapReduce supports combiners, Cascading does not. Instead, we decided to hack together our own solution, which we’re calling a “pseudo-combiner”.  A traditional combiner maintains a buffer of tuples and does sorting and aggregation when the buffer fills up. Our pseudo-combiner maintains a map with the group field as the key and the combiner output as the value. For every tuple, we will perform the combiner functionality when we update the entry in the map. This is better for our most common use case, which involves counting billions of values in fewer than 100 categories. Since we can very easily hold all of our categories in the map, we can ensure that we only have one output per key value from each mapper.

Our implementation uses an LRUHashMap, which is an in-house extension of LinkedHashMap.  The LRUHashMap uses the LinkedHashMap to maintain a cap on how many entries are allowed in the map and evicts the oldest key, value pairs when the map grows beyond its limit. The evicted pair is made available so that we can emit the correct output for it. When all the input tuples have been read, we merely flush all the contents of the hash and emit all necessary tuples for them.

The abstract class we’ve designed has three functions that should be implemented by every combiner:

public abstract class Combiner<T> extends BaseOperation implements Function {
  protected abstract T initialize(Tuple tuple);
  protected abstract void update(T toUpdate, Tuple newTuple);
  protected abstract Tuple getTuple(T mapValue);
}

The initialize function is called the first time we see a key, and allows us to store the initial combiner value for the key.  The update function is called whenever we see a value for a key we’ve already seen.  The current value in the map and the new tuple are passed in.  The final getTuple function is called whenever we need to decide what to emit for an entry in our map.  This occurs on eviction and when we flush the contents of the map at the end.

Thus, our use case of counting would look like this:

public class MapsideCount extends Combiner<Long> {
  //  When we see the first tuple, we initialize the count to 1
  protected Long initialize(Tuple tuple) {
    return 1;
  }

  // On each subsequent tuple, we increment the count
  protected void update(Long toUpdate, Tuple newTuple) {
    toUpdate++;
  }

  // When we need to emit a tuple for the key, we emit the count stored in the map
  protected Tuple getTuple(Long mapValue) {
    return new Tuple(mapValue);
  }
}

Our adoption of combiners gave us huge performance improvements, cutting one of our stats jobs from an hour down to around two minutes, which means we can run the stats hourly.

Posted in Cascading | Tagged | Leave a comment

Application Deployment at Rapleaf

One challenge we constantly face at Rapleaf is how to remain agile as we grow. In the early days, we would say “I’m going to deploy” to a handful of other engineers in the room. When we needed to move beyond that, we designed a deployment system with 3 key objectives:

    Remain agile. Engineers should be able to deploy as frequently as they need to.
    Communicate. Other engineers should know when a deploy is about to happen and when it is finished.
    Must be robust. Have access controls, prevent multiple deploys and check for common problems.

We were able to create a simple deployment system using Screen, CruiseControl, Capistrano and Openfire (Jabber). Screen was an obvious choice when we considered all of our requirements. Before screen, we all deployed from our local machines. There were a couple of problems with this: many people could deploy at once and their environments could be different. Using a shared screen on a single machine solved both of these problems. Screen also has access controls, multiple windows and scrollback history. We have a window for each application and our screenrc file looks something like this:

shell -${SHELL}

# Set status line
caption always "%{B}%t deploy %{R}REMEMBER TO 'svn up' %{B}%? @%u%?%? %{d}[%h]%?%=%D %M %d %C %A"

# command usually invoked by C-a " would also be available as C-a space
bind ' ' windowlist -b

# Create a screen window per application
screen -ln -t "app1" -h 5000 1
screen -ln -t "app2" -h 5000 2

# Allow multiple users to connect
multiuser on

# Engineers that can deploy
acladd engineer1,engineer2

CruiseControl is a great continuous integration tool. We always specify a revision when we deploy, so we can easily check cruise to make sure that the revision has passed the test suite.

Capistrano is the basis for all of our application deploys, including java apps. We identified several functions we wanted to perform in every deploy and put those into a common library. Those common functions include: notifying a jabber conference room on start/finish, running a sanity check on revision being deployed and dealing with our load balancer. Capistrano has many other nice features that we use, but I won’t go into that here.

We’ve been running this system for a while and really like it. We were able to meet all of our objectives with a simple design and some great open-source tools.

Posted in Operations | Tagged , , , , , , , , | Leave a comment

Avoiding Java varargs snafus

Since Java 1.5, Java has allowed you to take advantage of “varargs“, a usability feature that many other languages support. It lets you write really clean code and support some pretty cool use cases.

However, there is at least one possible pitfall of using varargs. Consider the method below:

public boolean filter() {
(do some filtering)
}

filter();

Let’s say you factor the existing filtering logic out into a FilterStrategy class, and change the method to take an array of FilterStrategy objects using varargs syntax so that it looks like this instead:

public boolean filter(FilterStrategy... strategies) {
  for (FilterStrategy: strategies) {
     (use the strategies)
  }
}

filter();

The interesting thing to take note of is that your refactoring actually didn’t break any of the callers – it just silently turned them into no-ops. This is because, as the docs say, varargs “automates and hides the process” of passing an array. Putting no arguments in a call to filter is the same as

filter(new FilterStrategy[0]);

In some cases, this may be what you want. However, in the case we described above, it’s certainly not – it’s hard to imagine a filter method where not filtering anything is the common case.

The problem here arises from the misuse of varargs. Using varargs basically means “zero or more”, when in fact this method is looking for “one or more”. However, nothing in the method signature is actually enforcing this. Luckily, in cases like this, there is an easy solution:

public boolean filter(FilterStrategy first, FilterStrategy... rest) {
  // use first
  (...)
  // use rest
  for (FilterStrategy: rest) {
     (...)
  }
}

// this will be a compile error
filter();

Now, after the refactoring, the compiler will tell you that you’ve made an error in calling the filter method.

Finally, it’s worth noting that another way to avoid problems like this is to just not use varargs at all when they don’t make sense. I know that I personally have used varargs in the past when I knew the production code would be using an array, but the testing code would like to just pass one or two of something. This is probably a significant misuse of the construct for very little usability gain, so I would discourage others from going down the same road. (You can usually approximate the same effect with Arrays.asList anyways.)

Posted in Miscellaneous | Tagged | Leave a comment

Parallelized bloom filter creation with Map/Reduce

As we’ve mentioned in the past, bloom filters are an important part of our workflow. They allow us to quickly skip a large portion of the records that we’re not interested in, thinning out the amount of data that has to be CoGrouped in our Cascading flows.

Up until recently, we’ve just been creating our bloom filters on the machine that launched the filtering job using good old single-threaded Java operations. However, as the amount of data we process daily has grown, the filter creation step alone has started to take a fair chunk of time. (We’re talking as much as 10 minutes to make the filter so that a 2 hour filtering job can run.) While it’s certainly not the only bottleneck in our workflow, we do this kind of filter-aided join in a lot of places, so several 10-minute startup times take hours over the course of a day.

We decided that we would make this bloom filter creation parallel using MapReduce so that we can harness the full capacity of our cluster, rather than slamming a single CPU or mucking around with traditional multithreaded programming.

There are a few different ways you can go about this sort of thing. One way is use a pseudo-combiner to output one filter per map task, and then combine them all together with a bitwise OR in the reduce step. While this would work logically, on further examination, it’s really not such a hot idea, because it means that every map task will output lots of data, and you’ll only have a single reducer to read in TONS of these “tuples”. I could actually imagine this approach taking longer just due to the additional data volume. There are a small handful of ways that you might be able to improve this approach, like making the group into a “shuffle group” (like a “shuffle join“) to increase the number of reducers, but I think we’ve found a better way.

The better way of building the filter is to expose a little bit of the guts of your filter to MapReduce and take advantage of MapReduce’s natural grouping and aggregation properties. First, decide on the size of the bit vector and number of hash functions you want your filter to use. Also, make sure that your hash function is adequately exposed so that it can be used outside of the filter. Then, in each mapper, for each key you want in the filter, perform each of the hash functions, mod each result by the vector size to get the bit position in the vector, and emit them all. These positions could immediately be set in the output vector, but we’re not ready for that yet.

Instead, in advance, decide on what the optimal number of reducers for your job and make that the “partitioning factor” for segments of the filter, dividing the bit positions up such that each reducer gets a contiguous range. For instance, let’s say I’m going to build a vector that takes up 100 million bits. I’d like to have 10 reducers, which means each reducer will be responsible for a range of about 10 million bits (about 1.2MB). Once you have this value all picked out, it is trivial to compute which range a given bit position belongs in, and you can emit the bit position with the range as the key. This will cause all the bits with the same partition to end up in the same single reducer, where they can be properly merged into a single bit vector.

After processing all the bit positions, each reducer will write out a single value, which is a slice of the overall bit vector. Now it’s just a matter of looping over the reducers’ output and combining them into a single bloom filter. Now you can send off the filter for use wherever you might need it.

Another nice optimization is to add a “distinct” combiner to the end of the map phase. This makes sure that if there are any collisions (or outright duplicate keys) in a given map task, you’ll only output one of them. This can go a long way towards reducing the size of your intermediate data and speeding the job up. (Note that it’s not necessary to do any distincting before merging it into the filter slice, since that naturally distincts them due to its idempotent properties.)

Implementing this process has turned our 10+ minute filter creations into about 1 minute MapReduce jobs. All in all, a big win! Many thanks to star Rapleaf engineer Takashi, who implemented this for us in record time.

Posted in Miscellaneous | 2 Comments

Faster string to UTF-8 encoding in Java

Update: It turns out that after further investigation, the performance improvements didn’t hold up when some uncovered correctness bugs forced some code changes. The patch was rolled back, so we’re stuck with the same old encoding mechanism. Sigh.

I’ve spent a lot of time profiling Thrift serialization and deserialization, and one thing that has always stood out is Java’s UTF-8 String encoding and decoding. I’ve monkeyed around with different ways to make this faster, but I’ve always come up short of any big improvements.

The culprit is the notoriously slow String.getBytes(“UTF-8″) call. While it is an incredibly convenient method for getting your string’s UTF-8 representation as a byte array, for some reason, it takes up a ton of time. To boot, the method can only encode into brand new byte arrays, meaning you are also going to incur both an object creation to encode the string and a byte array copy to get that encoded string into your final output. (On the decoding side, however, you can pass in a buffer, offset, and length, so there’s no need for an extra array copy if you know what you’re doing.)

In looking for alternatives, I found my way to CharsetEncoder. I was pretty hopeful about this one – it doesn’t take a string charset each time, and it allows encoding into a ByteBuffer, so it seemed promising. Alas, it turned out to be slower than getBytes, if only marginally. It seems like this particular class is more suited to situations when you need more control over the exact same encoding process that getBytes taps into.

Finally, luck shined upon me when I was having a look over the Thrift/Protobuf Comparison benchmark project. The project’s maintainer pointed out that DataOutputStream’s writeUTF method was the fastest string encoder in sight, and some other serialization frameworks (like Kryo) had poached that methodology for their own purposes. Tantalized, I quickly set up a benchmark shootout between getBytes, CharsetEncoder, and the writeUTF method. Stunningly, writeUTF is about 2x as fast as the other two methods! Now we’re getting somewhere.

I have followed suit and added this method to Thrift. You can find it in TRUNK and the upcoming 0.3 release in the Utf8Helper class. If your Thrift structs are string-heavy, then this will give you a substantial boost in performance. Try it out and let me know how it goes!

Posted in Thrift | 3 Comments

Thrift and pseduo-RDF schemas

BackType’s Nathan Marz recently wrote a really great post about using Thrift and an RDF-like schema to get type-safe, extensible, high-performance schemas for use in Hadoop environments. He really hit the nail on the head describing the use pattern and the positives and negatives.

A variation on this approach is something we’ve been doing at Rapleaf for a few years now. Our heavy use of Thrift, including for this style of data storage, is what lead us to contribute patches for union support, amongst many other features. I’m really glad to see that others are making as much use of these features as we are.

One thing he’s doing that we’re not is treating different types of “relations” as first-class separate data types. Instead, we have a single DataUnit class with all the possible attributes on it. This means that the information we can store about different node types isn’t as constrained. This has been suitable for us up to this point because all of our data is about individuals. I could certainly see how, as we add different kinds of nodes (like Facebook Groups or corporations) into our system, using this strategy would be a big consistency improvement.

Additionally, our data model includes an extra field of general metadata we call Pedigree. This stores the when, where, and how elements of the data we collect. It’s not a significant extension to the data model Nathan describes, but it’s an important one.

All in all, this is a great step forward. Hope to see more of this kind of stuff out of you, Nathan!

Posted in Hadoop, Thrift | 1 Comment

Accelerate your test suite with Cascading 1.1

One big downside of using Cascading for our applications has been the runtime of our regression test suite. We test with quantities of data nowhere near our regular production volume, but we still end up running lots of jobs. In our experience, this ends up making our tests take a long time (in the tens of minutes), killing our ability to iterate quickly.

After looking more deeply into the issue, we discovered that it all came down to one particular polling interval. When Cascading launches a Flow, it will launch one or more actual Hadoop jobs and then wait until each completes before launching the next in the pipeline. The problem is that the amount of time Cascading waits between checking job completion is set to something like 5 seconds by default! This setting makes plenty of sense in the case of real world jobs, which should all be at least minutes in length – 5 seconds isn’t going to make a difference one way or another. However, in our ultra-short job scenario, this makes all the difference. If your Flow works out to 10 jobs that run serially, the fastest it could complete is 50 seconds.

Initially, we customized Cascading 1.0.8 to reduce this wait time down to about 100 milliseconds. However, when we recently upgraded to Cascading 1.1, we were pleasantly surprised to find that this polling interval was now configurable. Generally, it looks something like this:

Map<Object, Object> properties = new HashMap<Object, Object>();
properties.put("cascading.flow.job.pollinginterval", 100);
new FlowConnector(properties).connect(...).complete();

With a convenient way to change this parameter, the only other thing we need is a convenient way to set this value environmentally. Ideally, we’d like to leave the parameter alone during production runs and only set it low during our test suite. This is a little tricky because, unlike Hadoop, Cascading itself doesn’t provide any global configuration framework.

The solution we ended up going with was to provide a class with a static method for getting new FlowConnectors that replaces the standard constructor. This method allows the user to provide any options they need and merges in whatever the current environmental polling interval should be. It looks something like this:

public final class CascadingHelper {
  public static final Map<Object, Object> DEFAULT_PROPERTIES = new HashMap<Object,Object>();

  private CascadingHelper() {}

  public static FlowConnector getFlowConnector() {
    return new FlowConnector(DEFAULT_PROPERTIES);
  }

  public static FlowConnector getFlowConnector(Map<Object, Object> properties) {
    Map<Object, Object> combined = new HashMap<Object, Object>();

    for (Map.Entry<Object,Object> entry : DEFAULT_PROPERTIES.entrySet()) {
      combined.put(entry.getKey(), entry.getValue());
    }

    for (Map.Entry<Object,Object> entry : properties.entrySet()) {
      combined.put(entry.getKey(), entry.getValue());
    }

    return new FlowConnector(combined);
  }
}

Finally, in our test suite, we made all of our tests inherit from a common base test class that reconfigures the default polling interval in the class constructor. Voila, our test suite takes 40% less time!

The only downside we’re currently presented with is that if a user forgets to use either the FlowConnector provider or the test base class, then the Flows in the tests run slowly, so constant vigilance is required. Still, making this change has caused our build to run somewhere between 2x and 3x as fast, which is just plain awesome.

Posted in Cascading | Tagged , | 1 Comment

Dealing with skewed key sizes in Cascading

Rapleaf indexes data from a wide variety of sources and across all different sorts of people. As a result, some of the people we analyze end up having a lot more data about them stored in our systems than others. For instance, your average person has around a hundred friends, whereas Ashton Kutcher’s Twitter account has millions.

This variance in the size of a “person” in our system has led us to experience some interesting problems when we do things like CoGroup or GroupBy in such a way that all of a person’s data is reduced together. In these cases, sometimes we end up with massively skewed task sizes and runtimes. Since there are generally only a few of these people with millions and millions of datapoints, instead of all reducers taking pretty much the same amount of time, a few will take much, much longer than the rest. This leads to performance degradation and, at times, outright job failure when an overlarge task becomes unable to report in.

The intuitive way to solve this problem is to decrease the amount of data a given reducer will be responsible to process. However, doing so is not as easy as just increasing the number of reducers. Since the problem is not the aggregate amount of data that goes to an individual reducer, but the sheer number of values that just a few keys have, more reducers would just make the successful tasks smaller and the unsuccessful tasks more tightly focused on the few bad keys.

Instead, what you need to do is actually spread the data over more keys so that it will be distributed amongst reducers more evenly. The way that we have accomplished this is by doing what I’ve been thinking of as a “shuffle join”. Basically, you take your natural key and add some additional variation to it in order to form a synthetic key, and then do your joining or grouping on the synthetic key.

Let’s look at how to do this for the CoGroup use case in detail. First, pick some shuffle factor, which we’ll call N. This number should be chosen such that you are looking to send at most 1/N of the data for a given natural key to a single reducer. Next, you form your synthetic keys in all of your pipes. I think of my pipes as being of two different types: keys and values. Every value must find its key, but the keys don’t need to see every value. To form synthetic keys for your key pipe, emit a new tuple with the natural key plus each of the numbers between 0 and N-1. To form synthetic keys in the values pipe, emit the tuple with its natural key plus a random number between 0 and N-1. (Make sure you are using a deterministic means of selecting random numbers as discussed here.) Finally, perform your CoGroup on these pipes, specifying the join fields to be the fields that compose the synthetic key.

The result of this extra setup is that 1/N of your “values” will get each of the numbers 0 to N-1, and all of the keys will be “copied” with the additional 0 to N-1 field. When it goes into the join, each of the synthetic keys will get no more than 1/N of the values for the natural key. For most reasonable values of N, this will cause your outlier keys to be split into manageable chunks, and your reduce tasks will become homogeneous again, giving you back consistent runtimes. After the CoGroup pipe, you’re free to strip the extra field off your tuple to get the get back your original tuple and go about your business.

This same strategy can be used to shuffle up GroupBy pipes as well, and more easily at that. Instead of worrying about key and value pipes, just treat everything like a value pipe and stick in the random number. Each key should be seen about N times in N different reducers.

The unfortunate downside of this strategy is that it’s not suitable for every possible use case, since any Every pipe downstream won’t get all the values for the natural key. If you are planning to do some aggregation, you need to consider if it can be done piecemeal or not. For instance, if you are just counting the records for each key, you can emit the value you get as a sort of subtotal and just put all the subtotals together after the fact, either with a little Java code or another set of pipes. (After all, the output of this step should be much smaller now, and is likely totally manageable.)

Posted in Cascading, MapReduce | 2 Comments

Master Kozikowski

Piotr Kozikowski was one of our star interns during the summer of 2008 and we are excited to welcome him back as a full-time employee starting next week. Piotr recently published his Master’s thesis on data inference, in conjunction with Rapleaf. Here is the abstract of his very well-written paper. Congratulations Piotr!

Although social networking websites have existed for well over a decade, it is only recently (since approx. 2005) that they have become immensely popular. Today, social networking websites play a very important role in the lives of millions of people.

As the user base of social networking sites grows at a rapid pace, so do the opportunities to take advantage of the vast amount of data that becomes available as a result.

The purpose of this thesis is to use data gathered from social networking sites to develop techniques to infer unknown attributes of individuals based on other, known attributes, as well as attributes of people with whom the individuals have a relationship.

The main focus of the thesis is the inference of gender, age, and location. Other attributes are also analyzed, but in much less detail. Additionally, the possibility of assigning weights to the relations among individuals to provide a measure of friendship closeness is also explored.

The data analyzed does not come from one specific social networking website, but rather is composed of publicly available information coming from a number of such websites. It is hoped that this data is representative of the social web in general and, therefore, any findings will also be applicable to the social web in general.

Posted in Miscellaneous | 1 Comment

Keeping It Clean

Like most engineering teams, we at Rapleaf do not write perfect code. Whether because of design shortsightedness, changing requirements, poor style, obsoleted functionality or a host of other factors, we often find ourselves wanting to clean up parts of our system. The need to clean up code is common to every individual and development organization, and lots has been written about the costs of technical debt and tips for effective refactoring. Somewhat less discussed are the procedural strategies that organizations employ to effectively encourage code cleanup; a team may know how to refactor, but without an effective mechanism may still find itself paralyzed by too frequent, too infrequent, or unmotivated practices.

One of the core cleanup strategies that we use at Rapleaf revolves around an event we call “Sweepleaf”. Roughly every 6 weeks, we designate a day to be a special Sweepleaf day during which engineers set aside their regular work and focus exclusively on cleanup tasks. These tasks often involve rewriting or eliminating code, decommissioning obsolete services, or other work that yields a healthier codebase. At the end of the day, we all share in the catharsis of reporting how many lines of code we deleted or files we removed.

The key benefits of our Sweepleaf process are twofold. First is the obvious: Our code is much leaner, more readable, and generally healthier than it would be otherwise. In the absence of Sweepleaf, a multitude of cleanup tasks might not get addressed before they become crippling. One way I like to think about Sweepleaf is that it is an opportunity to take care of those tasks that are eternally 2nd priority; they are the tasks that you really, really want to take care of when you run across them in the code, but a higher priority deliverable is always in the way.

The less obvious benefit is psychological. Because of the regularity of Sweepleaf, engineers are aware that they will have a cleanup opportunity in the near future, mitigating the frustration of living with imperfect code. They can immediately record the item in our active list of Sweepleaf tasks (maintained in Jira). Further, the simple knowledge that others are improving code quality is a boon to morale. One engineer on the team expresses his feeling that “there’s this great relief – your life just got easier. You get to bask in the glow of an agile culture. You’re lean and mean, and everyone on your team knows it.”

Sweepleaf on its own does not solve every cleanup issue. Some tasks are too large to be addressed within a day, and some are too urgent to be left alone for 6 weeks. In those cases we’ll spend time where it’s needed and make sure the job is done right, regardless of the day. On the whole, though, Sweepleaf both raises our “cleanup consciousness” and motivates us as a team to keep it clean.

Posted in Miscellaneous | 7 Comments