机器学习和生物信息学实验室联盟

 找回密码
 注册

QQ登录

只需一步,快速开始

搜索
查看: 3966|回复: 2
打印 上一主题 下一主题

MapReduce最经典入门代码

[复制链接]
跳转到指定楼层
楼主
发表于 2012-6-10 11:16:53 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
本帖最后由 Fth-Hokage 于 2012-6-10 11:22 编辑
  1. import java.io.*;  
  2. import java.util.*;  
  3.   
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.filecache.DistributedCache;  
  6. import org.apache.hadoop.conf.*;  
  7. import org.apache.hadoop.io.*;  
  8. import org.apache.hadoop.mapred.*;  
  9. import org.apache.hadoop.util.*;  
  10.   
  11. public class WordCount extends Configured implements Tool {  
  12.   
  13.     public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  
  14.   
  15.       static enum Counters { INPUT_WORDS }  
  16.   
  17.       private final static IntWritable one = new IntWritable(1);  
  18.       private Text word = new Text();  
  19.   
  20.       private boolean caseSensitive = true;  
  21.       private Set<String> patternsToSkip = new HashSet<String>();  
  22.   
  23.       private long numRecords = 0;  
  24.       private String inputFile;  
  25.   
  26.       public void configure(JobConf job) {  
  27.         caseSensitive = job.getBoolean("wordcount.case.sensitive", true);  
  28.         inputFile = job.get("map.input.file");  
  29.   
  30.         if (job.getBoolean("wordcount.skip.patterns", false)) {  
  31.           Path[] patternsFiles = new Path[0];  
  32.           try {  
  33.             patternsFiles = DistributedCache.getLocalCacheFiles(job);  
  34.           } catch (IOException ioe) {  
  35.             System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));  
  36.           }  
  37.           for (Path patternsFile : patternsFiles) {  
  38.             parseSkipFile(patternsFile);  
  39.           }  
  40.         }  
  41.       }  
  42.   
  43.       private void parseSkipFile(Path patternsFile) {  
  44.         try {  
  45.           BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));  
  46.           String pattern = null;  
  47.           while ((pattern = fis.readLine()) != null) {  
  48.             patternsToSkip.add(pattern);  
  49.           }  
  50.         } catch (IOException ioe) {  
  51.           System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));  
  52.         }  
  53.       }  
  54.   
  55.       public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {  
  56.         String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();  
  57.   
  58.         for (String pattern : patternsToSkip) {  
  59.           line = line.replaceAll(pattern, "");  
  60.        }  

  61.         StringTokenizer tokenizer = new StringTokenizer(line);  
  62.         while (tokenizer.hasMoreTokens()) {  
  63.           word.set(tokenizer.nextToken());  
  64.           output.collect(word, one);  
  65.           reporter.incrCounter(Counters.INPUT_WORDS, 1);  
  66.         }  
  67.   
  68.          if ((++numRecords % 100) == 0) {  
  69.          reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);  
  70.         }  
  71.      }  
  72.     }  
  73.   
  74.     public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  
  75.       public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {  
  76.         int sum = 0;  
  77.         while (values.hasNext()) {  
  78.           sum += values.next().get();  
  79.         }  
  80.        output.collect(key, new IntWritable(sum));  
  81.      }  
  82.     }  
  83.   
  84.     public int run(String[] args) throws Exception {  
  85.       JobConf conf = new JobConf(getConf(), WordCount.class);  
  86.       conf.setJobName("wordcount");  
  87.   
  88.       conf.setOutputKeyClass(Text.class);  
  89.       conf.setOutputValueClass(IntWritable.class);  
  90.   
  91.       conf.setMapperClass(Map.class);  
  92.       conf.setCombinerClass(Reduce.class);  
  93.       conf.setReducerClass(Reduce.class);  
  94.   
  95.       conf.setInputFormat(TextInputFormat.class);  
  96.       conf.setOutputFormat(TextOutputFormat.class);  

  97.       List<String> other_args = new ArrayList<String>();  
  98.       for (int i=0; i < args.length; ++i) {  
  99.         if ("-skip".equals(args[i])) {  
  100.           DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);  
  101.           conf.setBoolean("wordcount.skip.patterns", true);  
  102.         } else {  
  103.           other_args.add(args[i]);  
  104.         }  
  105.       }  
  106.   
  107.       FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));  
  108.       FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));  
  109.   
  110.       JobClient.runJob(conf);  
  111.       return 0;  
  112.     }  
  113.   
  114.      public static void main(String[] args) throws Exception {  
  115.       int res = ToolRunner.run(new Configuration(), new WordCount(), args);  
  116.       System.exit(res);  
  117.     }  
  118. }  
复制代码
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享
回复

使用道具 举报

沙发
发表于 2012-6-11 18:10:55 | 只看该作者
DistributedCache 文件路径有没有问题??
回复 支持 反对

使用道具 举报

板凳
 楼主| 发表于 2012-6-18 10:04:30 | 只看该作者
Genie 发表于 2012-6-11 18:10
DistributedCache 文件路径有没有问题??

哦 没有问题啊 这个程序运行的一个示例为:
bin/hadoop jar ~/wordcount.jar WordCount -D wordcount.case.sensitive=true wordcount/input wordcount/output -skip /user/joe/wordcount/patterns.txt
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

机器学习和生物信息学实验室联盟  

GMT+8, 2024-6-2 18:24 , Processed in 0.070617 second(s), 18 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表