MapReduce的入门

时间:2019-01-11 20:38:58   收藏:0   阅读:208
1. MapReduce 的介绍:

   MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
  MapReduce大体上分三个部分:
  - MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
  - MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild
  - ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild

2.MapReduce编写代码的流程:

3.WordCount 案例:

public class MyWordCount {
    public static void main(String[] args) {
        // 指定 hdfs 相关的参数
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://hadoop01:9000");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            // 新建一个 job 任务
            Job job=Job.getInstance(conf);
            // 设置 jar 包所在路径
           job.setJarByClass(MyWordCount.class);
            // 指定 mapper 类和 reducer 类
            job.setMapperClass(Mapper.class);
            job.setReducerClass(MyReduce.class);

            // 指定 maptask 的输出类型,注意,如果maptask的输出类型与reducetask输出类型一样,mapTask可以不用设置
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 指定 reducetask 的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // 指定该 mapreduce 程序数据的输入和输出路径
            Path input=new Path("/data/input");
            Path output =new Path("/data/output");
            //一定要保证output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //递归删除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            // 最后提交任务
             boolean success = job.waitForCompletion(true);
             System.exit(success?0:-1);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    private class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        Text mk =new Text();
        IntWritable mv=new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对
             String[] words = value.toString().split("\\s+");
             for(String word:words){
                 mk.set(word);
                 context.write(mk,mv);
             }
        }
    }
    private class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
        IntWritable mv=new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum=0;
            // 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计
            for(IntWritable value:values){
                sum+=value.get();
            }
            mv.set(sum);
            context.write(key,mv);
        }
    }
}

4. MapReduce 程序的核心运行机制:

1)MapReduce 程序的运行流程:

原文:http://blog.51cto.com/14048416/2341806

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!