Cascading is an awesome tool, but there’s a particular situation I have run into a few times where the abstractions have broken down. The situation occurs when you have multiple pipes that you need to group together on a common key, but other than the key the pipes have different fields. Let’s use the following example:
Pipe “ages” contains 2-tuples of the form (“person_id”, “age”). Pipe “transactions” contains 4-tuples of the form (“person_id”, “transaction_id”, “transaction_type”, “transaction_date”). We want to write a job that emits all “person_id”’s for people who are over 25 and have had at least 10 transactions. The straightforward way to do this with Cascading would be something like the following:
Pipe transactions = new Pipe("transactions");
transactions = new GroupBy(transactions, new Fields("person_id"));
transactions = new Every(transactions, new Count(new Fields("count")));
transactions = new Each(transactions, new Fields("person_id", "count"), new Identity());
transactions = new Each(transactions, new Fields("count"), new FilterLessThan(10));
ages = new Each(ages, new Fields("age"), new FilterLessThan(26));
Pipe results = new CoGroup(ages, new Fields("person_id"), transactions, new Fields("person_id"),
new Fields("person_id", "age", "person_id2", "count"), new InnerJoin());
results = new Each(results, new Fields("person_id"), new Identity());
This flow would result in two jobs, one necessitated by the GroupBy and one necessitated by the CoGroup. A faster way to do this would be to group both the transaction data and the age data into a single Buffer (or Aggregator) and perform all the filtering logic there. To group these tuples together into the same reducer, you have to change the tuples to “look” the same and add some sort of flag so you can distinguish the tuples. In this example it’s possible but not pretty.
To handle situations like this more elegantly, I created a new pipe type called “MultiGroupBy”. It’s a hack, but it works and I hope a proper implementation gets added to Cascading in the near future. Using this new pipe, my example job would be written as:
Pipe transactions = new Pipe("transactions");
// will explain mysterious "6" later on
Pipe results = new MultiGroupBy(new Pipe[] {ages, transactions}, new Fields("person_id"), 6, new SelectTargetIds());
results = new Each(results, new Fields("person_id"), new Identity());
...
public static class SelectTargetIds extends MultiBuffer {
public SelectTargetIds() {
super(new Fields("selected"));
}
@Override
protected void operate() {
Iterator<Tuple> agesIt = getArgumentsIterator(0);
if(!agesIt.hasNext()) return;
int age = agesIt.next().getInteger(1);
int count = 0;
Iterator<Tuple> transactionsIt = getArgumentsIterator(1);
while(transactionsIt.hasNext()) {
count++;
transactionsIt.next();
}
if(count>=10 && age >=25) {
emit(new Tuple(true)); //need to emit something
}
}
}
This is simpler and will be faster because it results in only a single job. In the custom function provided to the MultiGroupBy, an iterator to each input pipe can be obtained separately via the “getArgumentsIterator” function. The custom function’s constructor declares the result fields, and the result of the MultiGroupBy will be the custom function’s result fields appended to the grouping field.
MultiGroupBy works by hijacking “CoGroup”. The CoGroup pipe allows different kinds of joins to be specified, such as “InnerJoin” and “LeftJoin”. These join implementations get iterators to each side of the join as input and need to produce an iterator to the joined tuples as output. MultiGroupBy works by using a custom join operation and executes the user’s code within the CoGroup. This is where the hacks begin.
CoGroup expects its results to be a join, so it expects the the number of fields in the resulting tuples to be the sum of the number of fields of the input pipes. MultiGroupBy works around this by inserting “dummy” fields into the tuples so that everything matches up. Unfortunately, MultiGroupBy doesn’t know how many dummy fields to create since there’s no way to ask a pipe how many fields it has. This is where the mysterious “6″ comes from in the example – the user has to tell MultiGroupBy the total number of fields across all input pipes. A second problem with this hack is that the MultiBuffer cannot output more fields than this “sum of all input pipe fields” number.
The second hack is how MultiGroupBy deals with the results of the MultiBuffer. MultiBuffers output their results using the “emit” method, while standard Cascading operations output directly into an output collector. The reason for this is that CoGroup expects the results in an iterator, so MultiGroupBy collects all the results in a SpillableTupleList and then gives the CoGroup an iterator to the list.
MultiGroupBy has been really useful. If you’re interested, you can download the MultiGroupBy code here: MultiGroupBy.java

One Trackback
[...] use of “MultiGroupBy”, a new pipe type I hacked together and originally wrote about here. The source code is linked on that page as well. MultiGroupBy allows you to do a GroupBy on [...]