There are multiple ways of getting data out of HDFS on to a local machine that does not belong to the HDFS cluster. The method used really depends on the needs of the data-transfer.
The simplest way of getting data out of HDFS on to a non-cluster machine is to use the functions built into the hadoop script. One example is the -copyToLocal flag. This will move files one-by-one to the local file system in sequence. It’s easy to use and fine for moving small number of files. Moving many files this way will take time.
A second method is to use distcp with a local jobtracker. This adds some complexity but brings in the ability to do more rsync-type operations than just using -copyToLocal. As with -copyToLocal running distcp this way gives you a single thread. This means there is no parallelism to the copy even though distcp is capable of this so files will be moved over one-by-one. One thing to note is the file URL does have 3 forward slashes, 2 to define the URL and 1 to define the root of the filesystem. All file URLS must be from / (slash).
The third and final method I’ll go over is to set-up a pseudo-distributed system that uses the cluster’s HDFS for distcp. This method is a lot more involved but gives you all the features of distcp (rsync-like and parallelism) when copying files.
The first step is to copy the conf/hadoop-site.xml file from the HDFS cluster. This file should contain all information on how to connect to the HDFS. You will have to add/change the value for mapred.job.tracker to point to the local machine and add/change value for the number of mappers and reducers. Usually this equals the number of cores on the local machine. All other values should stay the same.
<name>mapred.job.tracker</name>
<value>localmachine.domain:7277</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>8</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>8</value>
</property>
Now make sure the conf/masters file has just “localhost” and the conf/slaves file has the domainname of the local machine. Once that is done start up a local jobtracker and tasktracker and if everything went well you should be able to see see the jobtracker UI on that machine if you go to the webpage on http://domainname:50030.
bin/hadoop-daemon.sh start tasktracker
Now you should be able to run distcp in a distributed fashion which should give you great improvement when moving large amounts of data out from HDFS.


A new Cascading pipe - MultiGroupBy
Cascading is an awesome tool, but there’s a particular situation I have run into a few times where the abstractions have broken down. The situation occurs when you have multiple pipes that you need to group together on a common key, but other than the key the pipes have different fields. Let’s use the following example:
Pipe “ages” contains 2-tuples of the form (”person_id”, “age”). Pipe “transactions” contains 4-tuples of the form (”person_id”, “transaction_id”, “transaction_type”, “transaction_date”). We want to write a job that emits all “person_id”’s for people who are over 25 and have had at least 10 transactions. The straightforward way to do this with Cascading would be something like the following:
Pipe transactions = new Pipe("transactions");
transactions = new GroupBy(transactions, new Fields("person_id"));
transactions = new Every(transactions, new Count(new Fields("count")));
transactions = new Each(transactions, new Fields("person_id", "count"), new Identity());
transactions = new Each(transactions, new Fields("count"), new FilterLessThan(10));
ages = new Each(ages, new Fields("age"), new FilterLessThan(26));
Pipe results = new CoGroup(ages, new Fields("person_id"), transactions, new Fields("person_id"),
new Fields("person_id", "age", "person_id2", "count"), new InnerJoin());
results = new Each(results, new Fields("person_id"), new Identity());
This flow would result in two jobs, one necessitated by the GroupBy and one necessitated by the CoGroup. A faster way to do this would be to group both the transaction data and the age data into a single Buffer (or Aggregator) and perform all the filtering logic there. To group these tuples together into the same reducer, you have to change the tuples to “look” the same and add some sort of flag so you can distinguish the tuples. In this example it’s possible but not pretty.
To handle situations like this more elegantly, I created a new pipe type called “MultiGroupBy”. It’s a hack, but it works and I hope a proper implementation gets added to Cascading in the near future. Using this new pipe, my example job would be written as:
Pipe transactions = new Pipe("transactions");
// will explain mysterious "6" later on
Pipe results = new MultiGroupBy(new Pipe[] {ages, transactions}, new Fields("person_id"), 6, new SelectTargetIds());
results = new Each(results, new Fields("person_id"), new Identity());
...
public static class SelectTargetIds extends MultiBuffer {
public SelectTargetIds() {
super(new Fields("selected"));
}
@Override
protected void operate() {
Iterator<Tuple> agesIt = getArgumentsIterator(0);
if(!agesIt.hasNext()) return;
int age = agesIt.next().getInteger(1);
int count = 0;
Iterator<Tuple> transactionsIt = getArgumentsIterator(1);
while(transactionsIt.hasNext()) {
count++;
transactionsIt.next();
}
if(count>=10 && age >=25) {
emit(new Tuple(true)); //need to emit something
}
}
}
This is simpler and will be faster because it results in only a single job. In the custom function provided to the MultiGroupBy, an iterator to each input pipe can be obtained separately via the “getArgumentsIterator” function. The custom function’s constructor declares the result fields, and the result of the MultiGroupBy will be the custom function’s result fields appended to the grouping field.
MultiGroupBy works by hijacking “CoGroup”. The CoGroup pipe allows different kinds of joins to be specified, such as “InnerJoin” and “LeftJoin”. These join implementations get iterators to each side of the join as input and need to produce an iterator to the joined tuples as output. MultiGroupBy works by using a custom join operation and executes the user’s code within the CoGroup. This is where the hacks begin.
CoGroup expects its results to be a join, so it expects the the number of fields in the resulting tuples to be the sum of the number of fields of the input pipes. MultiGroupBy works around this by inserting “dummy” fields into the tuples so that everything matches up. Unfortunately, MultiGroupBy doesn’t know how many dummy fields to create since there’s no way to ask a pipe how many fields it has. This is where the mysterious “6″ comes from in the example - the user has to tell MultiGroupBy the total number of fields across all input pipes. A second problem with this hack is that the MultiBuffer cannot output more fields than this “sum of all input pipe fields” number.
The second hack is how MultiGroupBy deals with the results of the MultiBuffer. MultiBuffers output their results using the “emit” method, while standard Cascading operations output directly into an output collector. The reason for this is that CoGroup expects the results in an iterator, so MultiGroupBy collects all the results in a SpillableTupleList and then gives the CoGroup an iterator to the list.
MultiGroupBy has been really useful. If you’re interested, you can download the MultiGroupBy code here: MultiGroupBy.java