A new Cascading pipe - MultiGroupBy

Cascading is an awesome tool, but there’s a particular situation I have run into a few times where the abstractions have broken down. The situation occurs when you have multiple pipes that you need to group together on a common key, but other than the key the pipes have different fields. Let’s use the following example:

Pipe “ages” contains 2-tuples of the form (”person_id”, “age”). Pipe “transactions” contains 4-tuples of the form (”person_id”, “transaction_id”, “transaction_type”, “transaction_date”). We want to write a job that emits all “person_id”’s for people who are over 25 and have had at least 10 transactions. The straightforward way to do this with Cascading would be something like the following:

Pipe ages = new Pipe("ages");
Pipe transactions = new Pipe("transactions");

transactions = new GroupBy(transactions, new Fields("person_id"));
transactions = new Every(transactions, new Count(new Fields("count")));
transactions = new Each(transactions, new Fields("person_id", "count"), new Identity());
transactions = new Each(transactions, new Fields("count"), new FilterLessThan(10));

ages = new Each(ages, new Fields("age"), new FilterLessThan(26));

Pipe results = new CoGroup(ages, new Fields("person_id"), transactions, new Fields("person_id"),
   new Fields("person_id", "age", "person_id2", "count"), new InnerJoin());
results = new Each(results, new Fields("person_id"), new Identity());

This flow would result in two jobs, one necessitated by the GroupBy and one necessitated by the CoGroup. A faster way to do this would be to group both the transaction data and the age data into a single Buffer (or Aggregator) and perform all the filtering logic there. To group these tuples together into the same reducer, you have to change the tuples to “look” the same and add some sort of flag so you can distinguish the tuples. In this example it’s possible but not pretty.

To handle situations like this more elegantly, I created a new pipe type called “MultiGroupBy”. It’s a hack, but it works and I hope a proper implementation gets added to Cascading in the near future. Using this new pipe, my example job would be written as:

Pipe ages = new Pipe("ages");
Pipe transactions = new Pipe("transactions");

// will explain mysterious "6" later on
Pipe results = new MultiGroupBy(new Pipe[] {ages, transactions}, new Fields("person_id"), 6, new SelectTargetIds());
results = new Each(results, new Fields("person_id"), new Identity());

...

public static class SelectTargetIds extends MultiBuffer {

   public SelectTargetIds() {
     super(new Fields("selected"));
   }

   @Override
   protected void operate() {
      Iterator<Tuple> agesIt = getArgumentsIterator(0);
      if(!agesIt.hasNext()) return;
      int age = agesIt.next().getInteger(1);
     
      int count = 0;
      Iterator<Tuple> transactionsIt = getArgumentsIterator(1);
      while(transactionsIt.hasNext()) {
        count++;
        transactionsIt.next();
      }
     
      if(count>=10 && age >=25) {
        emit(new Tuple(true)); //need to emit something
      }      
   }
   
}

This is simpler and will be faster because it results in only a single job. In the custom function provided to the MultiGroupBy, an iterator to each input pipe can be obtained separately via the “getArgumentsIterator” function. The custom function’s constructor declares the result fields, and the result of the MultiGroupBy will be the custom function’s result fields appended to the grouping field.

MultiGroupBy works by hijacking “CoGroup”. The CoGroup pipe allows different kinds of joins to be specified, such as “InnerJoin” and “LeftJoin”. These join implementations get iterators to each side of the join as input and need to produce an iterator to the joined tuples as output. MultiGroupBy works by using a custom join operation and executes the user’s code within the CoGroup. This is where the hacks begin.

CoGroup expects its results to be a join, so it expects the the number of fields in the resulting tuples to be the sum of the number of fields of the input pipes. MultiGroupBy works around this by inserting “dummy” fields into the tuples so that everything matches up. Unfortunately, MultiGroupBy doesn’t know how many dummy fields to create since there’s no way to ask a pipe how many fields it has. This is where the mysterious “6″ comes from in the example - the user has to tell MultiGroupBy the total number of fields across all input pipes. A second problem with this hack is that the MultiBuffer cannot output more fields than this “sum of all input pipe fields” number.

The second hack is how MultiGroupBy deals with the results of the MultiBuffer. MultiBuffers output their results using the “emit” method, while standard Cascading operations output directly into an output collector. The reason for this is that CoGroup expects the results in an iterator, so MultiGroupBy collects all the results in a SpillableTupleList and then gives the CoGroup an iterator to the list.

MultiGroupBy has been really useful. If you’re interested, you can download the MultiGroupBy code here: MultiGroupBy.java

Posted in Cascading, Hadoop, MapReduce | Leave a comment

Multiple ways of copying data out of HDFS

There are multiple ways of getting data out of HDFS on to a local machine that does not belong to the HDFS cluster. The method used really depends on the needs of the data-transfer.

The simplest way of getting data out of HDFS on to a non-cluster machine is to use the functions built into the hadoop script.  One example is the -copyToLocal flag.  This will move files one-by-one to the local file system in sequence.  It’s easy to use and fine for moving small number of files.  Moving many files this way will take time.

hadoop fs -copyToLocal hdfs://name-node/path/to/dir /path/to/local/dir

A second method is to use distcp with a local jobtracker.  This adds some complexity but brings in the ability to do more rsync-type operations than just using -copyToLocal. As with -copyToLocal running distcp this way gives you a single thread.  This means there is no parallelism to the copy even though distcp is capable of this so files will be moved over one-by-one.  One thing to note is the file URL does have 3 forward slashes, 2 to define the URL and 1 to define the root of the filesystem.  All file URLS must be from / (slash).

hadoop distcp -jt local hdfs://name-node/path/to/dir file:///path/to/local/dir

The third and final method I’ll go over is to set-up a pseudo-distributed system that uses the cluster’s HDFS for distcp.  This method is a lot more involved but gives you all the features of distcp (rsync-like and parallelism) when copying files.

The first step is to copy the conf/hadoop-site.xml file from the HDFS cluster.  This file should contain all information on how to connect to the HDFS.  You will have to add/change the value for mapred.job.tracker to point to the local machine and add/change value for the number of mappers and reducers.  Usually this equals the number of cores on the local machine.  All other values should stay the same.

<property>
<name>mapred.job.tracker</name>
<value>localmachine.domain:7277</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>8</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>8</value>
</property>

Now make sure the conf/masters file has just “localhost” and the conf/slaves file has the domainname of the local machine.  Once that is done start up a local jobtracker and tasktracker and if everything went well you should be able to see see the jobtracker UI on that machine if you go to the webpage on http://domainname:50030.

bin/hadoop-daemon.sh start jobtracker
bin/hadoop-daemon.sh start tasktracker

Now you should be able to run distcp in a distributed fashion which should give you great improvement when moving large amounts of data out from HDFS.

hadoop distcp hdfs://name-node/path/to/dir file:///path/to/local/dir
Posted in HDFS, Hadoop | 3 Comments

Backing up Hadoop’s HDFS

Very little information can be found on methods of backing up data on a Hadoop cluster. To be clear, by Hadoop cluster I mean HDFS. Any solution would have to take a few things into account. The first is how to keep your data in multiple places for security. There is information on how people have multiple clusters and the data is just copied between the two. As well, and before there’s controversy, replication is not a form of backup. Yes HDFS does have a replication factor and that value is used to keep multiple copies of the file. This does help keep data safe in case of node failure but deleting a file still deletes the file, and once that happens there’s no turning back.

The next issue has to do with the sheer volume of data that can be stored in HDFS. The idea of trying to backup the data on HDFS to tape at first can be quite daunting while at the same time a very enticing idea for off-site/disaster recovery type scenarios. There is no way you can stage all the data in HDFS on some RAID or NAS device before going down to tape - at least not without spending a ton of money on a custom solution from one of the storage vendors.

Our Solution

Here is one approach to backing up HDFS using the time-stamp of the files on HDFS to create a pseudo-incremental backup. We wrote a program that takes four parameters:

  • Parameter #1 is a date in the past
  • Parameter #2 is a maximum amount of data to be downloaded
  • Parameter #3 is the path on HDFS to backup to copy files from
  • Parameter #4 is the path to the staging area to copy files to

The first parameter is used to start backing up files that have a time-stamp equal or greater than the date given. This does mean traversing the entire HDFS directory structure for files and their time-stamps as well as sorting by date. In the big scheme of things the time this takes, as compared to moving all files down to the staging area, is acceptable.

The second parameter is there to stop the program from filling the staging area making the backup crash. This is a way to limit the data the program downloads from HDFS. There is a caveat to this. If the rate of growth of your data in HDFS per backup cycle is greater than the space in the staging area, this aproach will never work as the backup will never catch up. As well, if this type of backup is started after there is data in HDFS it will take some time to catch up to the data created daily. Depending on your data size already in HDFS and data added daily, the amount of space in the staging area might be very large to get all the data down to tape fast enough to catch up.

The last two parameters are used to determine what directories on HDFS are to be backed-up and where they should land on the staging area.

Our tape backup solution can run a script before and after the data has been moved to tape. Before the data is dumped to tape the program described above is run using an initial time-stamp. The program prints out the time-stamp of the last file it downloaded. The staging area is then dumped to tape. The backup program is set up to always do incremental backups of the staging area. Once that is over the time-stamp printed out is taken and set up to be used as the input for the program’s next run. The backup program now thinks it has been doing incremental backups of the same data set even though it has really only been seeing the incremental downloads the program has created.

Even with the caveats presented with this approach has proven to work quite well in that we haven’t needed a large staging area to backup massive amounts of information in a way that we can recover from file deletion and other catastrophes.

Here is an example implementation of the incremental HDFS backup.

Posted in HDFS, Hadoop | Leave a comment

Thrift Union Pattern

At Rapleaf, we use Thrift structs as the basic cornerstone of many of our processes. I won’t go into great detail talking about Thrift in general, but suffice it to say that Thrift is easy and flexible enough to be used as the primary means of storing data and communicating between our various components.

One limitation of Thrift is that it supports neither polymorphism nor variant types. Superficially, this would seem to indicate that some use cases would be cumbersome to implement. For instance, a service with a generic “processMessage” method couldn’t work, since you’d need a method per message type. However, we’ve found a design pattern that allows us to get the functionality we want very easily with a minimum amount of complexity, which we’ve been calling a “Thrift union”.

Let’s look at an example Thrift file:

<pre>
struct StructA {
  // StructA-specific fields
}

struct StructB {
  // StructB-specific fields
}

struct StructAOrStructB {
  1: required i32 specifc_struct_type;

  2: optional StructA struct_a;
  3: optional StructB struct_b;
}
</pre>

StructAOrStructB contains two very important parts. First, it contains a bunch of optional fields that could contain any of the possible subtypes. Second, it contains a required field that indicates which of the optional fields should be used. Effectively, the required field encodes the type and name of the whole struct, and by convention, none of the other optional fields are set.

There you have it. What you’ve created is a union-like structure that can only take on the types and names that you’ve pre-allowed. Note that this is not a true variant type (since it can’t contain just any value), but in practice, it seems that a true variant isn’t all that useful. It is much better to predefine what types you can understand up front so that you know what to do in every case; after all, that’s what the Thrift IDL is there to do, right?

This approach seems to perform pretty well, though there is at least a minor performance hit to very large structs with lots of optional fields. I think that serialization speed would roughly double (in Java) if we were clever enough to only serialize the one field that was set. To that end, we’ve opened an issue to create a more proper union implementation in Thrift itself.

Posted in Miscellaneous, Thrift | Leave a comment

“Multi-threaded” MySQL Replication

Several of our MySQL slave databases have excessive replication lag. They continue to fall further and further behind the master and will never catch up. One solution is to resync the master/slave occasionally. This is a manual and time-consuming process. Another solution is to partition the database. This requires code updates and is also time-consuming.

We have a database that can easily be partitioned by table. Instead of partitioning the master and slave, we decided to try replication filters and only partition the slave.

MySQL Split Replication

One advantage is that no code updates are required when accessing the master. Another advantage is that we can run two MySQL slaves on the same hardware that one slave was running on. MySQL replication is single-threaded, which is often the reason for excessive replication lag. By running multiple MySQL instances, each of which replicates half the tables, you essentially get “multi-threaded” replication. Using the same hardware, we now have a slave configuration that is able to keep up with the master.

There are several different ways to write replication filters, but here is one example from my.cnf:

# Slave A (would contain tables 1-5)
replicate-ignore-table = app_production.table_6
...
replicate-ignore-table = app_production.table_10

# Slave B (would contain tables 6-10)
replicate-ignore-table = app_production.table_1
...
replicate-ignore-table = app_production.table_5
Posted in Mysql | 2 Comments

Kickstarting machines using GRUB

All of the machines in Rapleaf’s server environment are installed via PXE and Kickstart. At the moments this involves progrmatically changing the boot-order of the machine to boot of the network when the machine needs to be rebooted.  We do this via IPMI.  The new machines purchased for our new Hadoop cluster don’t have IPMI so, to PXE boot someone needs to go down to the Colo, plug in a monitor and keyboard, and press F12 as soon as the machine boots.  This is less than optimal.

This is where GRUB has come in very handy. There is a special kernel+initrd combination needed for Kickstart.  These files can be found under /images/pxeboot of your installation media.  If the kernel+initrd file combination is placed on the host machine in a place that GRUB can get to it (usually /boot) GRUB can be configure to boot them.  To boot off the kernel+initrd pair GRUB needs to know which parameters to boot with. The following should be appended to /boot/grub/grub.conf and be made the default boot image for the machine.

title KickLeaf
root (hd0,0)
kernel /vmlinuz ksdevice=eth0 ks=http://74.125.67.100/kickstart/ks.file
initrd /initrd.img

  • The first line is just the title that will appear on the GRUB boot menu.
  • The second line points to the kernel that will be used to Kickstart the machine. The “ksdevice=” tells the kernel which of multiple network cards will be used to connect to the network. The “ks=” points to the Kickstart file that will be used for this machine.
  • The last line defines the initrd used for the Kickstart installation.

It might not be obvious but the machine will not continue to boot into this Kickstart mode and install and reboot forever in a loop. Part of the Kickstart is to reformat the drives on the machines and recreate the partitions and the master boot record. Once the machine reboots GRUB will not have an entry for the kernel+initrd needed to Kickstart.  With this we are able to re-Kickstart all cluster machines remotely in ~10 minutes without having to use IPMI or plug in a monitor and keyboard to any of the machines.

Posted in Grub, Hadoop, Kickstart | 1 Comment

bash-ful http response codes with Apache

For our recent scheduled downtime we wanted to replace the entire web site with a maintenance page. That’s not so bad. The simplest solution is to use an AliasMatch directive to the apache config:

AliasMatch .* /var/www/rapleaf/maintenance.html

Unfortunately this was insufficient for us because we also wanted to make it a 503: Service Unavailable http response. You might think (as I did) that you could do this with a RedirectMatch and an ErrorDocument,

RedirectMatch 503 .*
ErrorDocument 503 /maintenance.html

but you would (as I was) be wrong. The problem is that the second argument in ErrorDocument is a URL, so to Apache it’s a redirect to an external resource, not simply reading a local file. The redirected ErrorDocument matches the RedirectMatch, which sends it to the ErrorDocument, etc. It doesn’t work and it’s not good. Now I think there must be some way to write the regular expression in RedirectMatch so that it excludes /maintenance.html. Something like

RedirectMatch 503 (?!/maintenance.html).

but so far nothing I’ve tried works.

So instead, we used ScriptAliasMatch.

ScriptAliasMatch .* /var/www/rapleaf/maintenance.sh

where maintenance.sh is pretty much the shortest, silliest cgi ever

#!/bin/bash

cat <<__HEADER__
Status: 503
Content-type: text/html
__HEADER__

cat /var/www/rapleaf/maintenance.html

The part to note here is how one uses ‘Status’ to set the http response code.

And testing with nc,

[eric@chuck-norris ~]# echo -e 'HEAD /whatever HTTP/1.0rnHost:www.rapleaf.comrnrn' | nc www 80
HTTP/1.1 503 Service Temporarily Unavailable
Date: Wed, 18 Feb 2009 01:36:18 GMT
Server: Apache/2.2.9 (Unix) mod_ssl/2.2.9 OpenSSL/0.9.8b DAV/2Connection: close
Content-Type: text/html

Ta-da!

Posted in Apache | 2 Comments

Minimizing downtime when copying a Xen image

Recently we moved Rapleaf’s servers to a new Co-Location facility.  For this we had to move a couple of VMs containing core services to the a new Xen hypervisor in the new facility. One of these VMs is a critical service that many other services depend on therefore, it was critical to minimize downtime. The image is about 11Gig and the VM can’t be running while the copying is happening because it would corrupt the filesystem inside the VM.

Luckily the Xen hypervisor in which it was on had some unused space on the lvm. This means we could create an lvm snapshot and copy that. This still doesn’t solve the problem that the VM would still be running at the point of the snapshot.  We still have the problem of corrupting the filesystem inside the VM.

With Xen one can “pause” a VM. What this does is take the VM out of the CPU scheduler therefore “freezing” the VM in whatever state it is in. This stops any activity that would make the filesystem inside the VM corrupt.

So, to copy the VM we need to pause it, create the lvm snapshot and unpause the VM as quickly as possible.  The example below assumes the id for the VM is 1 and lives on logical volume /dev/VolGroup00/LogVol00.

~]# xm pause 1
~]# lvcreate -L15G -s -n vmsnapshot /dev/VolGroup00/LogVol00
~]# xm unpause 1

This ran in 0.06 seconds and no services, as far as I know, even felt the “hickup” with the machine. Then the snapshot was mounted on the filesystem, the image was copied over and the snapshot was unmounted and destroyed.

~]# mount /dev/VolGroup00/vmsnapshot /mnt/snapshot
~]# cp /mnt/snapshot/var/lib/xen/images/server_image_1 /tmp
~]# umount /mnt/snapshot
~]# lvremove -f /dev/VolGroup00/vmsnapshot

Posted in Xen | 8 Comments

ruby and mcrypt

Usually there’s little reason to use mcrypt with ruby. Ruby’s standard OpenSSL library should cover just about all of one’s encryption/decryption needs. But recently we did need mcrypt to help us debug an encryption problem with a partner. Specifically they were encrypting with mcrypt on php and we were having trouble decrypting it.

There’s very little documentation for Rb-mcrypt, the ruby wrapper around mcrypt and no examples. So after a little bit of diving into the code and reading the mcrypt documentation, I came up with this:

plaintext = 'sssh.  secret message.'
puts plaintext

algorithm = 'blowfish'
mode = 'cbc'
key = 'this is the secret key'
iv = 'this is the initial vector'

enc = MCrypt::Crypt.new
enc.init(algorithm, mode, key, iv)
encrypted = enc.encode(plaintext)
enc.close

dec = MCrypt::Crypt.new
dec.init(algorithm, mode, key, iv)
decrypted = dec.decode(encrypted)
dec.close

puts decrypted

See mutils/mcrypt.h to see the possible values for algorithm and mode.

Here’s something strange. If you put a puts plaintext after the call to encode(), you get gibberish! It looks like encode() (and decode()) screw up their arguments. Darn pass by reference! I don’t know how to stop it from happening. Just be aware that it does. Use .dup or something to protect your original plaintext string.

Posted in Ruby, Security, encryption, mcrypt | Leave a comment

Graceful shutdown, Hadoop, and black magic

Recently, while working on the Collector, I noticed that we had an issue with graceful shutdown of our servers. The Collector uses a JVM shutdown hook to catch the SIGTERM and take some cleanup actions before allowing the exit to go on. However, every time I would try to gracefully shut down a server, I’d get an error in the logs from Hadoop about the FileSystem already being closed. As a result, attempts to close in-progress files would fail.

Now, this was truly perplexing to me. Who was closing FileSystems? A thorough dig through all Rapleaf code found me no calls to FileSystem.closeAll(). I thought it might have been that due to the way the shutdown progressed, it wasn’t truly a graceful shutdown. An afternoon worth of refactoring proved that not to be the case.

Finally, I decided to go looking into Hadoop’s code itself to see if I might find an offender. Sure enough, in FileSystem.java, as soon as you open a FileSystem, HDFS registers its own shutdown hook! When the JVM is exiting, HDFS tries to clean up all its open FileSystems. For client applications, this makes a fair amount of sense - you can basically connect to whatever filesystems you want with impunity, and HDFS will make sure it cleans up after you. However, for server applications, this is almost definitely the wrong thing to do. Unexpected side effects are the bane of tightly controlled workflows.

To make matters worse, the JVM’s spec for shutdown hooks seems very naive. When there are multiple shutdown hooks registered, they are run in an undefined order, or even concurrently. Either way, HDFS’s shutdown hook was definitely running in advance of mine, and thus was heading off my graceful shutdown efforts.

So, now that the problem is found, what to do? Firstly, FileSystem exposes no methods for disabling this shutdown hook. Secondly, the JVM has methods for removing a particular shutdown hook, but offers no methods for enumerating the existing shutdown hooks in the first place, so you have to have the hook object already to disable it. To cap it all off, the shutdown hook object is kept in a private static member of the FileSystem class, so there’s no way to get at it.

Or is there? It turns out that through the use of some clever Java reflection, you can set that private static member to public and get its value. Then, once you have it, you can deregister it as a shutdown hook and then execute it as your leisure. Here’s the code I ended up creating to work around the problem:

  private void suppressHdfsShutdownHook() {
    try {
      Field field = FileSystem.class.getDeclaredField("clientFinalizer");
      field.setAccessible(true);
      hdfsClientFinalizer = (Thread)field.get(null);
      if (hdfsClientFinalizer == null) {
        throw new RuntimeException("client finalizer is null, can't suppress!");
      }
      Runtime.getRuntime().removeShutdownHook(hdfsClientFinalizer);
    } catch (NoSuchFieldException nsfe) {
      LOG.fatal("Couldn't find field 'clientFinalizer' in FileSystem!", nsfe);
      throw new RuntimeException("Failed to suppress HDFS shutdown hook");
    } catch (IllegalAccessException iae) {
      LOG.fatal("Couldn't access field 'clientFinalizer' in FileSystem!", iae);
      throw new RuntimeException("Failed to suppress HDFS shutdown hook");
    }
  }

Long term, the right thing to do is to patch HDFS itself to allow disabling the shutdown hook. I’ve opened an issue to that effect here.

Posted in Hadoop | 2 Comments