本帖最后由 Genie 于 2012-9-7 22:56 编辑
We have been doing a lot of batch processing with Hadoop MapReduce lately, and we quickly realized how painful it can be to write MapReduce jobs by hand. Some parts of our workflow require up to TEN MapReduce jobs to execute in sequence, requiring a lot of hand-coordination of intermediate data and execution order. Additionally, anyone who has done really complex MapReduce workflows knows how hard it is to keep “thinking” in MR.
Luckily, we discovered a great new open source product called Cascading which has alleviated a ton of our pain. Cascading is the brainchild and work of Chris Wensel, and he's done a great job developing an API which solves many of our problems. Cascading abstracts away MapReduce into a more natural logical model and provides a workflow management layer to handle things like intermediate data and data staleness.
Cascading's logical model abstracts away MapReduce into a convenient tuples, pipes, and taps model. Data is represented as “Tuples”, a named list of objects.
Eg : I can have a tuple (“url”, “stats”), where “url” is a "Text” object and “stats” is my own “UrlStats” complex object, containing methods for getting “numberOfHits” and “averageTimeSpent”. Tuples are kept together in “streams”, and all tuples in a stream have the exact same fields.
An operation on a stream of tuples is called a “Pipe”. There are a few kinds of pipes, each encompassing a category of transformations on a tuple stream.
Eg : "Each” pipe will apply a custom function to each individual tuple
"GroupBy” pipe will group tuples together by a set of fields
"Every” pipe will apply an“aggregator function” to all tuples in a group at once.
Suppose our tuple stream is of the form (“n1″, “n2″) and looks like:Assume this stream currently exists in a pipe called “workflow”. Now suppose we have a class “Double” that implements “operate” and which will double its argument and output the result in a field called “double”. Look at the following code:- //”workflow” contains tuples of the form (“n1″, “n2″)
- workflow = new Each(workflow, new Fields(“n1″), new Double(),new Fields(“n2″, “double”));
复制代码 What this code does is apply the “Double” operation to each tuple in the stream. The second parameter indicates that the “n1″ field should be used as the argument to the function. After the Double function completes, we will now have a 3-tuple of the form (“n1″, “n2″, “double”). The last argument indicates that we only want to pass on “n2″ and “double” to the next pipe. Running this code on the above example will produce the following tuple stream (with fields “n2″, “double”):“GroupBy” and “Every” with the classic word count example. Suppose we have a pipe called “workflow” which contains a tuple stream of the form (“word”, “count”). Suppose these tuples were generated by counting the number of times a word appeared on a line of a text file.- workflow = new GroupBy(workflow, new Fields(“word”));
- workflow = new Every(workflow, new Fields(“count”), new Sum(“total”),new Fields(“word”, “total”));
复制代码 Let's take a look at what this does on an example:- ("banana", 10)
- ("rose", 2)
- ("sleep", 5)
- ("rose", 7)
- ("rose", 10)
- ("banana", 2)
复制代码 The GroupBy step will emit the following “group stream”:- "banana":
- ("banana", 10)
- ("banana", 2)
- "rose":
- ("rose", 2)
- ("rose", 7)
- ("rose", 10)
- "sleep":
- ("sleep", 6)
复制代码 The Every step will collapse these tuples into the form (“word”, “total”) and produce:- ("banana", 12)
- ("rose", 19)
- ("sleep", 6)
复制代码 Like the Each example, the second line of code indicates to use the “count” field to sum over. The Fields declaration inside the “Sum” function indicates that the output should be named “total”. Finally, the last Fields declaration indicates that “word” and “total” should be passed along to the next pipe.
One of the most powerful features of Cascading is the ability to fork and merge pipes together.
Eg : one pipe with tuples of the form (“customer_id”, “name”) and another pipe of the form (“customer_id, “age”), Cascading makes it super easy to join these pipes together to get tuples of the form (“name”, “age”). The operation is called “CoGroup” and can do inner, outer, or mixed joins. For this example, the code would look like:- Pipe namePipe; // this contains (“customer_id”, “name”)
- Pipe agePipe; // this contains (“cust_id”, “age”)
- // do the join
- Pipe workflow = new CoGroup(namePipe, new Fields(“customer_id”),
- agePipe, new Fields(“cust_id”), new Fields(“id1″, “name”, “id2″, “age”));
- // strip away the “id” fields
- workflow = new Each(workflow, new Fields(“name”, “age”), new Identity());
复制代码 Once you have constructed your operations into a “pipe assembly”, you then tell Cascading how to retrieve and persist the data using an abstraction called “Tap”. “Taps” know how to convert stored data into Tuples and vice versa, and have complete control over how and where the data is stored. Cascading has a lot of built-in taps – using SequenceFiles and Text formats via HDFS are two examples. If you want to store data in your own format, you can define your own Tap. We have done this here at Rapleaf and it has worked seamlessly.
Once your taps defined, you hook up one Tap as the “source” to your pipe assembly, and another tap as the “sink” to your pipe assembly. This creates a “Flow”. Running the flow will read in the input set of tuples from the source Tap, run the tuples through the pipe assembly, and then write out the final output tuples into the sink Tap.
Internally, Cascading translates the pipe assembly into a series of MapReduce jobs. The taps specify the input and output formats along with the input and output paths. Cascading manages all the intermediate data necessary to get a sequence of MapReduce jobs to communicate. For example, a “GroupBy” followed by an “Every” followed by a “GroupBy” followed by an “Every” followed by an “Each” would translate into the following jobs:- Job 1:
- Mapper: Emit first group key for every tuple
- Reducer: Apply first “Every” operation in the reducer
- Job 2:
- Mapper: Emit second group key for every tuple
- Reducer:
- Apply second “Every” operation in the reducer
- Apply “Each” operation to each produced tuple from the “Every” function
- In this example, there would be a set of intermediate data between the two jobs that would be automatically deleted when the flow completes.
- In contrast, an “Each” followed by an “Each” followed by a “GroupBy” followed by an “Every” would produce:
- Job 1:
- Mapper: Apply first each function. Apply second each function. Emit on group key.
- Reducer: Apply “Every” function to tuples to emit output.
复制代码 For operations like joins, Cascading automates all the MapReduce logic necessary that actually performs the join.
The most recognizable competing product to Cascading is Pig, a Yahoo technology we also explored. Pig lets you specify batch queries in a neat SQL like syntax, but we found Pig unusable due to the inability to plug in custom input and output formats. One of the nicest things about Cascading is that it doesn't restrict you in any way – anything you can do via vanilla MapReduce you can do via Cascading. We like the fact that Cascading flows are all specified via a Java API rather than a SQL like language – this makes it very natural to create custom functions and very complex workflows. And if some part of your workflow is really performance-critical, Cascading gives you the flexibility to hand-code that part of the workflow with a MapReduce job and plug it in as a custom Flow.
If you’re interested in learning more, be sure to check out the website and the #cascading room on freenode. |