机器学习和生物信息学实验室联盟

 找回密码
 注册

QQ登录

只需一步,快速开始

搜索
查看: 4931|回复: 3
打印 上一主题 下一主题

Goodbye MapReduce, Hello Cascading

[复制链接]
跳转到指定楼层
楼主
发表于 2012-8-21 10:32:44 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
本帖最后由 Genie 于 2012-9-7 22:56 编辑

We have been doing a lot of batch processing with Hadoop MapReduce lately, and we quickly realized how painful it can be to write MapReduce jobs by hand. Some parts of our workflow require up to TEN MapReduce jobs to execute in sequence, requiring a lot of hand-coordination of intermediate data and execution order. Additionally, anyone who has done really complex MapReduce workflows knows how hard it is to keep “thinking” in MR.

Luckily, we discovered a great new open source product called Cascading which has alleviated a ton of our pain. Cascading is the brainchild and work of Chris Wensel, and he's done a great job developing an API which solves many of our problems. Cascading abstracts away MapReduce into a more natural logical model and provides a workflow management layer to handle things like intermediate data and data staleness.

Cascading's logical model abstracts away MapReduce into a convenient tuples, pipes, and taps model. Data is represented as “Tuples”, a named list of objects.
Eg : I can have a tuple (“url”, “stats”), where “url” is a "Text” object and “stats” is my own “UrlStats” complex object, containing methods for getting “numberOfHits” and “averageTimeSpent”. Tuples are kept together in “streams”, and all tuples in a stream have the exact same fields.

An operation on a stream of tuples is called a “Pipe”. There are a few kinds of pipes, each encompassing a category of transformations on a tuple stream.
Eg :  "Each” pipe will apply a custom function to each individual tuple
        "GroupBy” pipe will group tuples together by a set of fields
        "Every” pipe will apply an“aggregator function” to all tuples in a group at once.

Suppose our tuple stream is of the form (“n1″, “n2″) and looks like:
  1. (1, 2)
  2. (5, 7)
  3. (12, 5)
复制代码
Assume this stream currently exists in a pipe called “workflow”. Now suppose we have a class “Double” that implements “operate” and which will double its argument and output the result in a field called “double”. Look at the following code:
  1. //”workflow” contains tuples of the form (“n1″, “n2″)
  2. workflow = new Each(workflow, new Fields(“n1″), new Double(),new Fields(“n2″, “double”));
复制代码
What this code does is apply the “Double” operation to each tuple in the stream. The second parameter indicates that the “n1″ field should be used as the argument to the function. After the Double function completes, we will now have a 3-tuple of the form (“n1″, “n2″, “double”). The last argument indicates that we only want to pass on “n2″ and “double” to the next pipe. Running this code on the above example will produce the following tuple stream (with fields “n2″, “double”):
  1. (2, 2)
  2. (7, 10)
  3. (5, 24)
复制代码
“GroupBy” and “Every” with the classic word count example. Suppose we have a pipe called “workflow” which contains a tuple stream of the form (“word”, “count”). Suppose these tuples were generated by counting the number of times a word appeared on a line of a text file.
  1. workflow = new GroupBy(workflow, new Fields(“word”));
  2. workflow = new Every(workflow, new Fields(“count”), new Sum(“total”),new Fields(“word”, “total”));
复制代码
Let's take a look at what this does on an example:
  1. ("banana", 10)
  2. ("rose", 2)
  3. ("sleep", 5)
  4. ("rose", 7)
  5. ("rose", 10)
  6. ("banana", 2)
复制代码
The GroupBy step will emit the following “group stream”:
  1. "banana":
  2.   ("banana", 10)
  3.   ("banana", 2)
  4. "rose":
  5.   ("rose", 2)
  6.   ("rose", 7)
  7.   ("rose", 10)
  8. "sleep":
  9.   ("sleep", 6)
复制代码
The Every step will collapse these tuples into the form (“word”, “total”) and produce:
  1. ("banana", 12)
  2. ("rose", 19)
  3. ("sleep", 6)
复制代码
Like the Each example, the second line of code indicates to use the “count” field to sum over. The Fields declaration inside the “Sum” function indicates that the output should be named “total”. Finally, the last Fields declaration indicates that “word” and “total” should be passed along to the next pipe.

One of the most powerful features of Cascading is the ability to fork and merge pipes together.
Eg : one pipe with tuples of the form (“customer_id”, “name”) and another pipe of the form (“customer_id, “age”), Cascading makes it super easy to join these pipes together to get tuples of the form (“name”, “age”). The operation is called “CoGroup” and can do inner, outer, or mixed joins. For this example, the code would look like:
  1. Pipe namePipe; // this contains (“customer_id”, “name”)
  2. Pipe agePipe; // this contains (“cust_id”, “age”)

  3. // do the join
  4. Pipe workflow = new CoGroup(namePipe, new Fields(“customer_id”),
  5. agePipe, new Fields(“cust_id”), new Fields(“id1″, “name”, “id2″, “age”));

  6. // strip away the “id” fields
  7. workflow = new Each(workflow, new Fields(“name”, “age”), new Identity());
复制代码
Once you have constructed your operations into a “pipe assembly”, you then tell Cascading how to retrieve and persist the data using an abstraction called “Tap”. “Taps” know how to convert stored data into Tuples and vice versa, and have complete control over how and where the data is stored. Cascading has a lot of built-in taps – using SequenceFiles and Text formats via HDFS are two examples. If you want to store data in your own format, you can define your own Tap. We have done this here at Rapleaf and it has worked seamlessly.

Once your taps defined, you hook up one Tap as the “source” to your pipe assembly, and another tap as the “sink” to your pipe assembly. This creates a “Flow”. Running the flow will read in the input set of tuples from the source Tap, run the tuples through the pipe assembly, and then write out the final output tuples into the sink Tap.

Internally, Cascading translates the pipe assembly into a series of MapReduce jobs. The taps specify the input and output formats along with the input and output paths. Cascading manages all the intermediate data necessary to get a sequence of MapReduce jobs to communicate. For example, a “GroupBy” followed by an “Every” followed by a “GroupBy” followed by an “Every” followed by an “Each” would translate into the following jobs:
  1. Job 1:
  2. Mapper: Emit first group key for every tuple
  3. Reducer: Apply first “Every” operation in the reducer

  4. Job 2:
  5. Mapper: Emit second group key for every tuple
  6. Reducer:

  7. Apply second “Every” operation in the reducer
  8. Apply “Each” operation to each produced tuple from the “Every” function
  9. In this example, there would be a set of intermediate data between the two jobs that would be automatically deleted when the flow completes.

  10. In contrast, an “Each” followed by an “Each” followed by a “GroupBy” followed by an “Every” would produce:

  11. Job 1:
  12. Mapper: Apply first each function. Apply second each function. Emit on group key.
  13. Reducer: Apply “Every” function to tuples to emit output.
复制代码
For operations like joins, Cascading automates all the MapReduce logic necessary that actually performs the join.
The most recognizable competing product to Cascading is Pig, a Yahoo technology we also explored. Pig lets you specify batch queries in a neat SQL like syntax, but we found Pig unusable due to the inability to plug in custom input and output formats. One of the nicest things about Cascading is that it doesn't restrict you in any way – anything you can do via vanilla MapReduce you can do via Cascading. We like the fact that Cascading flows are all specified via a Java API rather than a SQL like language – this makes it very natural to create custom functions and very complex workflows. And if some part of your workflow is really performance-critical, Cascading gives you the flexibility to hand-code that part of the workflow with a MapReduce job and plug it in as a custom Flow.

If you’re interested in learning more, be sure to check out the website and the #cascading room on freenode.
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享
回复

使用道具 举报

沙发
发表于 2012-9-6 15:05:15 | 只看该作者
从我在实际中的应用来看,cascading这东西还需要再努力。
大部分时候能使得MR编程更加简单,但是有些时候,cascading的接口需要自己开发。
cascading更多定位在开发的流程方面,前提条件是开发人员要对MR收敛阶段比较关注了解。
比如对于一个SQL:
select a,count(*) from (select a,b,sum(c) from t1 group by a,b) t2
这种SQL在HADOOP中会拆分多少个MR运行呢?
当然从优化角度而言MR过程越烧越好,能够减少中间阶段IO支出。
但像这种SQL,一旦出现sum,count聚合运算,显然需要通过shuffle阶段的partitioner将目标数据路由分发至指定的reduce,只有到reduce阶段才能进行运算。
换而言之,map阶段不适合跨行运算,较多时候只能做逐行处理,而跨行统计运算等都要在reduce阶段作。
一旦sum运行在reduce阶段,sql中紧跟的count(*)在很多时候就不得不放入下一个MR中来运算了。
cascading就是在我们对MR原理了解之后在Hadoop MR框架上封装的一套机制。
代码核心仍然跑的是hadoop map/reduce的框架。

如附件是我对cascading源代码分析后起MR调用的过程。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?注册

x
回复 支持 反对

使用道具 举报

板凳
发表于 2012-9-6 20:17:53 | 只看该作者
reck 发表于 2012-9-6 15:05
从我在实际中的应用来看,cascading这东西还需要再努力。
大部分时候能使得MR编程更加简单,但是有些时候, ...

欢迎~~~
回复 支持 反对

使用道具 举报

地板
 楼主| 发表于 2012-9-7 12:31:59 | 只看该作者
reck 发表于 2012-9-6 15:05
从我在实际中的应用来看,cascading这东西还需要再努力。
大部分时候能使得MR编程更加简单,但是有些时候, ...

是的,就像 pig hive 也是MR上的一个封装。能否告知qq(我的是597271912,加我也行),以后有问题请教。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

机器学习和生物信息学实验室联盟  

GMT+8, 2024-11-23 09:29 , Processed in 0.073549 second(s), 19 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表