Hadoop MapReduce

Hadoop之MapReduce

Hadoop之MapReduce

简介

MapReduce是Google提出的分布式并行计算框架,主要用于搜索领域,解决海量数据的计算问题。

组成

MapReduce由Map和Reduce两个部分组成,只需要编写相应的Map和Reduce函数,其他部分都是套路。

执行步骤

  1. map任务处理
    1.1 读取输入文件内容,解析成key=>value对。对输入文件的每一行,解析成键值对。每一个键值对调用一次map函数。
    1.2 根据实际需要的逻辑对输入的键值对进行处理,转换成新的键值对输出。
    1.3 对输出的key=>value对进行分区。
    1.4 对不同分区的数据,按照key进行排序、分组。相同的key的value放到一个集合中。
    1.5 (可选)分组后的数据进行规约(combine)。
  2. reduce任务处理
    2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
    2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的键值对处理,转换成新的键值对输出。
    2.3 将reduce的输出保存到文件中。

一个形象的比喻

一张图理清MapReduce流程

经典例子 WordCount

输入:

hadoop mapreduce
hadoop hive
hadoop hbase
spark sqoop
mapreduce hive
python sklearn
hbase mongodb
hadoop hdfs

输出:

hadoop 4
hbase 2
hdfs 1
hive 2
mapreduce 2
mongodb 1
python 1
sklearn 1
spark 1
sqoop 1

  • 先编写map类
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public TokenizerMapper() {}

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());

            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }

        }
    }

TokenizerMapper类继承自Mapper类,该泛型类的前两个类型代表输入键值对的类型,后两个是输出键值对的类型。将每一行的每一个词A,都转成(A=>1)的形式输出。
map函数三个参数,输入的key,value,和一个Mapper.Context类型的context对象,用于输出每一个处理后的键值对。

  • 然后编写reduce类
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public IntSumReducer() {}

        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;

            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }

            this.result.set(sum);
            context.write(key, this.result);
        }
    }

同样IntSumReducer类继承自Reducer这个泛型类,四个参数类型,前两个是map输出即reduce输入的形式,后两个是reducer输出的形式。逻辑很简单,用一个迭代器,累加每个key值对应的value即次数。最后通过context输出到文件。

- main函数就是很套路的东西

    public static void main(String[] args) throws Exception {
        //配置
        Configuration conf = new Configuration();
        //这里采用读取参数的方式确定输入输出目录(参数分别为input, output)
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        //参数异常处理
        if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }

        //生成一个Job对象,即一次mapreduce作业
        Job job = Job.getInstance(conf, "word count");
        //分别把job的各个属性设置成编写好的类
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置文件读入路径
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        //设置文件输出路径
        Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
        FileOutputFormat.setOutputPath(job, outputPath);
        //创建文件系统类删除已存在的输出目录
        FileSystem fs = FileSystem.get(conf);
        fs.delete(outputPath, true);

        System.exit(job.waitForCompletion(true)?0:1);
    }

发表评论

电子邮件地址不会被公开。 必填项已用*标注