-
Recent Posts
Popular Posts
- Rent or Own: Amazon EC2 vs. Colocation Comparison for Hadoop Clusters 27 comment(s) | 10834 view(s)
- Mysql Replication Adapter 26 comment(s) | 6656 view(s)
- Making sure Ruby Daemons die 20 comment(s) | 7353 view(s)
- Matching Impedance: When to use HBase 19 comment(s) | 22272 view(s)
- Goodbye MapReduce, Hello Cascading 17 comment(s) | 9671 view(s)
- Rapleaf Challenge Problem 12 comment(s) | 3792 view(s)
- BloomFilter 11 comment(s) | 5441 view(s)
- Using random numbers in Hadoop MapReduce is dangerous 11 comment(s) | 4028 view(s)
- Ruby and HBase 10 comment(s) | 5264 view(s)
- Cycles of Doom in Batch Processing Workflows 10 comment(s) | 2660 view(s)
Categories
- Anonymouse (1)
- Apache (1)
- bash (1)
- Cascading (6)
- Daemons (1)
- encryption (1)
- Extensions (2)
- Google (1)
- Grub (1)
- Hadoop (22)
- HBase (6)
- HDFS (4)
- Kickstart (1)
- MapReduce (9)
- mcrypt (1)
- Miscellaneous (26)
- Mongrel (2)
- Mysql (2)
- OpenSocial (1)
- Operations (1)
- Ruby (7)
- Security (2)
- Thrift (6)
- Xen (1)
Archives
- August 2010
- July 2010
- June 2010
- May 2010
- April 2010
- March 2010
- February 2010
- January 2010
- December 2009
- November 2009
- October 2009
- September 2009
- August 2009
- July 2009
- June 2009
- May 2009
- March 2009
- February 2009
- December 2008
- November 2008
- October 2008
- September 2008
- August 2008
- July 2008
- April 2008
- March 2008
- February 2008
- January 2008
- December 2007
- November 2007
- October 2007
- September 2007
- August 2007

Pseudo-Combiners in Cascading
In order to get maximum performance from MapReduce, you need to minimize the amount of data that you have to transfer around the network. If nearly your entire input must be transferred from your mappers to your reducers, then you’ll be putting a great deal of stress on your disks and network. One thing that comes highly recommended is the use of combiners, which allow for part of the reducing to be done during the map phase in cases where you are performing associative and commutative aggregations such as counting, summing, or finding the minimum or maximum. This is especially true when you have very few group keys, which would force large numbers of tuples to be passed into a small number of reducers.
Unfortunately, while MapReduce supports combiners, Cascading does not. Instead, we decided to hack together our own solution, which we’re calling a “pseudo-combiner”. A traditional combiner maintains a buffer of tuples and does sorting and aggregation when the buffer fills up. Our pseudo-combiner maintains a map with the group field as the key and the combiner output as the value. For every tuple, we will perform the combiner functionality when we update the entry in the map. This is better for our most common use case, which involves counting billions of values in fewer than 100 categories. Since we can very easily hold all of our categories in the map, we can ensure that we only have one output per key value from each mapper.
Our implementation uses an LRUHashMap, which is an in-house extension of LinkedHashMap. The LRUHashMap uses the LinkedHashMap to maintain a cap on how many entries are allowed in the map and evicts the oldest key, value pairs when the map grows beyond its limit. The evicted pair is made available so that we can emit the correct output for it. When all the input tuples have been read, we merely flush all the contents of the hash and emit all necessary tuples for them.
The abstract class we’ve designed has three functions that should be implemented by every combiner:
protected abstract T initialize(Tuple tuple);
protected abstract void update(T toUpdate, Tuple newTuple);
protected abstract Tuple getTuple(T mapValue);
}
The initialize function is called the first time we see a key, and allows us to store the initial combiner value for the key. The update function is called whenever we see a value for a key we’ve already seen. The current value in the map and the new tuple are passed in. The final getTuple function is called whenever we need to decide what to emit for an entry in our map. This occurs on eviction and when we flush the contents of the map at the end.
Thus, our use case of counting would look like this:
// When we see the first tuple, we initialize the count to 1
protected Long initialize(Tuple tuple) {
return 1;
}
// On each subsequent tuple, we increment the count
protected void update(Long toUpdate, Tuple newTuple) {
toUpdate++;
}
// When we need to emit a tuple for the key, we emit the count stored in the map
protected Tuple getTuple(Long mapValue) {
return new Tuple(mapValue);
}
}
Our adoption of combiners gave us huge performance improvements, cutting one of our stats jobs from an hour down to around two minutes, which means we can run the stats hourly.