We have been doing a lot of batch processing with Hadoop MapReduce lately, and we quickly realized how painful it can be to write MapReduce jobs by hand. Some parts of our workflow require up to TEN MapReduce jobs to execute in sequence, requiring a lot of hand-coordination of intermediate data and execution order. Additionally, anyone who has done really complex MapReduce workflows knows how hard it is to keep “thinking” in MapReduce.
Luckily, we discovered a great new open source product called Cascading which has alleviated a ton of our pain. Cascading is the brainchild and work of Chris Wensel, and he’s done a great job developing an API which solves many of our problems. Cascading abstracts away MapReduce into a more natural logical model and provides a workflow management layer to handle things like intermediate data and data staleness.
Cascading’s logical model abstracts away MapReduce into a convenient tuples, pipes, and taps model. Data is represented as “Tuples”, a named list of objects. For example, I can have a tuple (“url”, “stats”), where “url” is a Hadoop “Text” object and “stats” is my own “UrlStats” complex object, containing methods for getting “numberOfHits” and “averageTimeSpent”. Tuples are kept together in “streams”, and all tuples in a stream have the exact same fields.
An operation on a stream of tuples is called a “Pipe”. There are a few kinds of pipes, each encompassing a category of transformations on a tuple stream. For instance, the “Each” pipe will apply a custom function to each individual tuple. The “GroupBy” pipe will group tuples together by a set of fields, and the “Every” pipe will apply an “aggregator function” to all tuples in a group at once.
Here’s a more in depth example of “Each”. Suppose our tuple stream is of the form (“number1″, “number2″) and looks like:
(1, 2) (5, 7) (12, 5)
Assume this stream currently exists in a pipe called “workflow”. Now suppose we have a class “Double” that implements “operate” and which will double its argument and output the result in a field called “double”. Let’s look at the following code:
[cc lang="java"]
//”workflow” contains tuples of the form (“number1″, “number2″)
workflow = new Each(workflow, new Fields(“number1″), new Double(),
new Fields(“number2″, “double”));
[/cc]
What this code does is apply the “Double” operation to each tuple in the stream. The second parameter indicates that the “number1″ field should be used as the argument to the function. After the Double function completes, we will now have a 3-tuple of the form (“number1″, “number2″, “double”). The last argument indicates that we only want to pass on “number2″ and “double” to the next pipe. Running this code on the above example will produce the following tuple stream (with fields “number2″, “double”):
(2, 2) (7, 10) (5, 24)
Now let’s take a closer look at “GroupBy” and “Every” with the classic word count example. Suppose we have a pipe called “workflow” which contains a tuple stream of the form (“word”, “count”). Suppose these tuples were generated by counting the number of times a word appeared on a line of a text file. Here’s the code to generate tuples of the form (“word”, “total”) with one tuple per word:
[cc lang="java"]
workflow = new GroupBy(workflow, new Fields(“word”));
workflow = new Every(workflow, new Fields(“count”), new Sum(“total”),
new Fields(“word”, “total”));
[/cc]
That’s it! Let’s take a look at what this does on an example:
("banana", 10)
("rose", 2)
("sleep", 5)
("rose", 7)
("rose", 10)
("banana", 2)
The GroupBy step will emit the following “group stream”:
"banana":
("banana", 10)
("banana", 2)
"rose":
("rose", 2)
("rose", 7)
("rose", 10)
"sleep":
("sleep", 6)
The Every step will collapse these tuples into the form (“word”, “total”) and produce:
("banana", 12)
("rose", 19)
("sleep", 6)
Like the Each example, the second line of code indicates to use the “count” field to sum over. The Fields declaration inside the “Sum” function indicates that the output should be named “total”. Finally, the last Fields declaration indicates that “word” and “total” should be passed along to the next pipe.
One of the most powerful features of Cascading is the ability to fork and merge pipes together. For example, if you have one pipe with tuples of the form (“customer_id”, “name”) and another pipe of the form (“cust_id”, “age”), Cascading makes it super easy to join these pipes together to get tuples of the form (“name”, “age”). The operation is called “CoGroup” and can do inner, outer, or mixed joins. For this example, the code would look like:
[cc lang="java"]
Pipe namePipe; // this contains (“customer_id”, “name”)
Pipe agePipe; // this contains (“cust_id”, “age”)
// do the join
Pipe workflow = new CoGroup(namePipe, new Fields(“customer_id”),
agePipe, new Fields(“cust_id”), new Fields(“id1″, “name”, “id2″, “age”));
// strip away the “id” fields
workflow = new Each(workflow, new Fields(“name”, “age”), new Identity());
[/cc]
Once you have constructed your operations into a “pipe assembly”, you then tell Cascading how to retrieve and persist the data using an abstraction called “Tap”. “Taps” know how to convert stored data into Tuples and vice versa, and have complete control over how and where the data is stored. Cascading has a lot of built-in taps – using SequenceFiles and Text formats via HDFS are two examples. If you want to store data in your own format, you can define your own Tap. We have done this here at Rapleaf and it has worked seamlessly.
Once you have your taps defined, you hook up one Tap as the “source” to your pipe assembly, and another tap as the “sink” to your pipe assembly. This creates a “Flow”. Running the flow will read in the input set of tuples from the source Tap, run the tuples through the pipe assembly, and then write out the final output tuples into the sink Tap.
Internally, Cascading translates the pipe assembly into a series of MapReduce jobs. The taps specify the input and output formats along with the input and output paths. Cascading manages all the intermediate data necessary to get a sequence of MapReduce jobs to communicate. For example, a “GroupBy” followed by an “Every” followed by a “GroupBy” followed by an “Every” followed by an “Each” would translate into the following jobs:
Job 1:
Mapper: Emit first group key for every tuple
Reducer: Apply first “Every” operation in the reducer
Job 2:
Mapper: Emit second group key for every tuple
Reducer:
- Apply second “Every” operation in the reducer
- Apply “Each” operation to each produced tuple from the “Every” function
In this example, there would be a set of intermediate data between the two jobs that would be automatically deleted when the flow completes.
In contrast, an “Each” followed by an “Each” followed by a “GroupBy” followed by an “Every” would produce:
Job 1:
Mapper: Apply first each function. Apply second each function. Emit on group key.
Reducer: Apply “Every” function to tuples to emit output.
For operations like joins, Cascading automates all the MapReduce logic necessary that actually performs the join.
The most recognizable competing product to Cascading is Pig, a Yahoo technology we also explored. Pig lets you specify batch queries in a neat SQL like syntax, but we found Pig unusable due to the inability to plug in custom input and output formats. One of the nicest things about Cascading is that it doesn’t restrict you in any way – anything you can do via vanilla MapReduce you can do via Cascading. We like the fact that Cascading flows are all specified via a Java API rather than a SQL like language – this makes it very natural to create custom functions and very complex workflows. And if some part of your workflow is really performance-critical, Cascading gives you the flexibility to hand-code that part of the workflow with a MapReduce job and plug it in as a custom Flow.
Cascading has saved us a ton of headaches here at Rapleaf. In this post, I’ve only scratched the surface of what’s possible with Cascading. If you’re interested in learning more, be sure to check out the website and the #cascading room on freenode. I would love to see more people get involved in this awesome project!








13 Comments
You should take a look at Yahoo!’s language for MapReduce, Pig. It has the same features for collapsing complexity as Cascading and is also pretty easy to work with.
check out yahoo research’s Pig, which is a framework running on top of hadoop, specifically designed for higher level set-oriented workflows.
http://research.yahoo.com/node/90
Just a few comments about Yahoo Pig:
You can read input files in custom formats, in order to do that you have to implement a StorageFunction (http://wiki.apache.org/pig/StorageFunction) for your specific format.
It will be easier for text based formats (like CSV) but it’s also possible to read binary files. For example see how a file in the Hadoop SequenceFile format can be read with Pig:
http://www.ivanprado.es/2008/02/reading-hadoop-sequencefile-from-pig.html
Have you looked at Apache Pig ( http://incubator.apache.org/pig )? It’s convenience is far beyond plain MapReduce and Cascading, it is like SQL.
The “cascading” flows much resemble the internal operations of a clustered multi-terabyte DB engine developed by SenSage (http://www.sensage.com).
The latter product aggregates log data from many sources in a corporate computing environment, compresses and stores this info onto disk in a redundant cluster. Whenever a SQL/Perl/Java query is requested, the system automatically figures how to construct flows similar to the “Cascading” flows, and how to distribute the filtering/aggregating jobs around the cluster. All data is pulled at the lowest level as “streams” from the disk, and filter through the filtering/aggregating nodes all the way up till a final query result is produced out the top.
For patent see http://www.google.com/patents?id=q4d4AAAAEBAJ&dq=sensage.
Thanks, Cascading sounds interesting, will check it out.
P.S. your favicon looks awfully like that of The Onion
@Toby, eugene, Juan: We actually investigated Pig well before we investigated Cascading. The main problem is that we need full control over every aspect of how the data is stored/retrieved – not just the file format. We store our data in a special directory structure and need control down to the level of creating input splits. Cascading literally lets you plug in custom Hadoop input and output formats.
In terms of convenience of syntax, Cascading and Pig are comparable. They both support the same kinds of operations and are easy to use.
Though this all sounds extremely exciting, I’m not sure I will ever need to process enough data to require its use.
Would anyone here care to suggest, in general terms, some real-world application(s) for this new capability?
There are many data-intensive, long running processes in business. A chain of retail stores generates 800 GB of cash register data a week, a media company wants to de-dupe their email, USPS mail, and customer registration master data list, a large oil refinery wants to continually analyze oil pressure & temp used to cool a pump.
Thanks for very clear introduction; MapReduce is easy “to count words”, but develop and maintain complex apps is painful… I like this feature “plug in custom Hadoop input and output formats”… I am not familiar with Pig, but after reading your post I feel that Cascading is very natural and simple to understand; and to work with Business Domain Model instead of “thinking in MapReduce” (similar comparison: plain SQL vs. Hibernate/EJB)
“You should take a look at Yahoo!’s language for MapReduce, Pig. It has the same features for collapsing complexity as Cascading and is also pretty easy to work with.”
I disagree. TO me its a little confusing. This way is better.
“You should take a look at Yahoo!’s language for MapReduce, Pig. ”
I disagree. TO me its a little confusing. This way is better.
thanks for all the great updates
10 Trackbacks
[...] Need to check this out as an alternative to hadoop streaming [...]
[...] Engineering Rapleaf: Goodbye MapReduce, Hello Cascading – “Internally, Cascading translates the pipe assembly into a series of MapReduce jobs. The taps specify the input and output formats along with the input and output paths. Cascading manages all the intermediate data necessary to get a sequence of MapReduce jobs to communicate.“ [...]
[...] coliss.com Sun 7 Sep 08 | 18:41 GMT Engineering Rapleaf – Goodbye MapReduce, Hello Cascading blog.rapleaf.com Sun 7 Sep 08 | 18:41 GMT Super Cat Stove jwbasecamp.com Sun 7 Sep 08 | 18:41 GMT The Ultimate List [...]
[...] Engineering Rapleaf – Goodbye MapReduce, Hello Cascading (tags: workflow mapreduce hadoop) This was written by andy. Posted on Wednesday, September 10, 2008, at 4:12 pm. Filed under Delicious. Bookmark the permalink. Follow comments here with the RSS feed. Post a comment or leave a trackback. [...]
[...] Engineering Rapleaf – Goodbye MapReduce, Hello Cascading (tags: workflow mapreduce hadoop) [...]
[...] interesting post from Nathan Marz regarding an abstraction layer from Chris Wensel called Cascading: We have been [...]
[...] the guys at Concurrent and Scale Unlimited. For an interesting user perspective on Cascading, see Goodbye MapReduce, Hello Cascading. See also Chris Wensel’s comparison of Cascading and [...]
[...] interesting post from Nathan Marz regarding an abstraction layer from Chris Wensel called Cascading: We have been [...]
[...] interesting post from Nathan Marz regarding an abstraction layer from Chris Wensel called Cascading: We have been [...]
[...] An interesting post from Nathan Marz regarding an abstraction layer from Chris Wensel called Cascading: We have been doing a lot of batch processing with Hadoop MapReduce lately, and we quickly realized how painful it can be to write MapReduce jobs by hand. Some parts of our workflow require up to TEN MapReduce jobs to execute in sequence, requiring a lot of hand-coordination of intermediate data and execution order. Additionally, anyone who has done really complex MapReduce workflows knows how hard it is to keep “thinking” in MapReduce. Luckily, we discovered a great new open source product called Cascading which has alleviated a ton of our pain. Cascading is the brainchild and work of Chris Wensel, and he’s done a great job developing an API which solves many of our problems. Cascading abstracts away MapReduce into a more natural logical model and provides a workflow management layer to handle things like intermediate data and data staleness. [...]