import java.io.Serializable; import java.util.Arrays; import java.util.Iterator; import cascading.operation.Identity; import cascading.pipe.CoGroup; import cascading.pipe.Each; import cascading.pipe.Pipe; import cascading.pipe.SubAssembly; import cascading.pipe.cogroup.GroupClosure; import cascading.pipe.cogroup.Joiner; import cascading.tuple.Fields; import cascading.tuple.SpillableTupleList; import cascading.tuple.Tuple; public class MultiGroupBy extends SubAssembly { public abstract static class MultiBuffer implements Serializable { private transient GroupClosure _closure = null; private Fields _resultFields; private SpillableTupleList _results; private int _pipeFieldsSum; public MultiBuffer(Fields resultFields) { _resultFields = resultFields; } public Fields getResultFields() { return _resultFields; } public void setContext(int pipeFieldsSum, GroupClosure closure) { _closure = closure; _results = new SpillableTupleList(); _pipeFieldsSum = pipeFieldsSum; } public SpillableTupleList getResults() { return _results; } protected abstract void operate(); protected void emit(Tuple result) { Tuple ret = new Tuple(_closure.getGrouping()); ret.addAll(result); while(ret.size() < _pipeFieldsSum) { ret.add(0); } _results.add(ret); } protected Iterator getArgumentsIterator(int pos) { return _closure.getIterator(pos); } } protected static class MultiGroupJoiner implements Joiner { protected MultiBuffer _buffer; private int _pipeFieldsSum; public MultiGroupJoiner(int pipeFieldsSum, MultiBuffer buffer) { _buffer = buffer; _pipeFieldsSum = pipeFieldsSum; } public Iterator getIterator(GroupClosure closure) { _buffer.setContext(_pipeFieldsSum, closure); _buffer.operate(); return _buffer.getResults().iterator(); } public int numJoins() { return -1; } } public MultiGroupBy(Pipe p0, Pipe p1, Fields groupFields, int pipeFieldsSum, MultiBuffer operation) { Pipe[] pipes = new Pipe[] { p0, p1}; Fields[] fields = new Fields[] {groupFields, groupFields}; init(pipes, fields, pipeFieldsSum, groupFields, operation); } public MultiGroupBy(Pipe p0, Fields group0, Pipe p1, Fields group1, int pipeFieldsSum, Fields groupRename, MultiBuffer operation) { Pipe[] pipes = new Pipe[] { p0, p1}; Fields[] fields = new Fields[] {group0, group1}; init(pipes, fields, pipeFieldsSum, groupRename, operation); } public MultiGroupBy(Pipe[] pipes, Fields groupFields, int pipeFieldsSum, MultiBuffer operation) { Fields[] allGroups = new Fields[pipes.length]; Arrays.fill(allGroups, groupFields); init(pipes, allGroups, pipeFieldsSum, groupFields, operation); } public MultiGroupBy(Pipe[] pipes, Fields[] groupFields, int pipeFieldsSum, Fields groupingRename, MultiBuffer operation) { init(pipes, groupFields, pipeFieldsSum, groupingRename, operation); } protected void init(Pipe[] pipes, Fields[] groupFields, int pipeFieldsSum, Fields groupingRename, MultiBuffer operation) { Fields resultFields = Fields.join(groupingRename, operation.getResultFields()); if(resultFields.size()>pipeFieldsSum) throw new IllegalArgumentException("Can't have output more than sum of input pipes since this is a hack!"); // unfortunately, need to hack around CoGroup validation stuff since cascading assumes it will return #fields=sum of input pipes Fields fake = new Fields(); fake = fake.append(resultFields); int i=0; while(fake.size() < pipeFieldsSum) { fake = fake.append(new Fields("__" + i)); i++; } Pipe result = new CoGroup(pipes, groupFields, fake, new MultiGroupJoiner(pipeFieldsSum, operation)); result = new Each(result, resultFields, new Identity()); setTails(result); } }