Hadoop (六):MapReduce基本使用

时间:2020-04-05 02:10:17   收藏:0   阅读:75

MapReduce原理

背景

MapReduce概述

工作流程

技术分享图片

 

 

 

  1. 数据上传到HDFS

  2. shell向Resource Manager发送计算请求。

  3. Node Manager通过心跳,向Resource Manager领取计算任务。

  4. Resource Manager分配资源,开始执行输入(InputFormat),先对文件进行分片,然后读取数据输入到Map中。

  5. Mapper读取输入内容,解析成键值对,1行内容解析成1个键值对,每个键值对调用一次map方法。

  6. 每个键值对执行map重写的方法,把输入的键值对转换成新的键值对。

  7. 多个Mapper的输出,按照不同的分区,通过网络复制到不同的Reducer节点。

    • Map shuffle阶段。

    • Reduce shuffle阶段

  8. 对多个Mapper的输出进行合并、排序,执行重写的reduce方法,再次输出新的键值对。

  9. 把最后的结果保存到文件中。

技术分享图片

 

 

 

第一个程序

实现功能:统计1个文档里出现了多少个单词,每个单词的个数是多少。

思路:

技术分享图片

 

 

 

代码

package com.rzp.utils;
?
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
?
import java.io.IOException;
?
?
//继承Mapper类,泛型前两个为输入的键值对的泛型,后两个是输出的键值对泛型
//LongWritable是long对应Hadoop的序列化的类型
//Text是String对应Hadoop的序列化类型
//Hadoop的序列化机制和java不一样,所以要使用Hadoop特定的类型
//Mapper读取数据时是一行一行的读取
//输入的key(KEYIN)表示每一行的起始的字节数
//输入的value(VALUEIN)表示一行的内容
//输出key(KEYOUT)的是不同的单词
//输出的value(VALUEOUT)是1,用于后续统计累加
public class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
?
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            //使用context类,做一个键值对的输出
            context.write(new Text(word),new LongWritable(1));
        }
    }
}
?

 

package com.rzp.utils;
?
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
?
import java.io.IOException;
/**
 * KEYIN:Reducer阶段输入的数据类型就是Mapper输出的key类型
 * VALUEIN:Reducer阶段输入的数据类型就是Mapper输出的value类型
 * KEYOUT:Reducer阶段输出的数据key类型,本案例中就是单词Text
 * VALUEOUT:Reducer阶段输出的value的数据类型,本案例中就是LongWritable
 *
 * Reducer接收所有来自Mapper处理的数据后,按照key的字典进行排序
 * <hello,1>,<tom,1>,<hello,1>....
 * 排序后
 * <hello,1>,<hello,1>.....
 * 按照key是否相同,作为一组调用reduce方法
 * 每一组的value作为一个迭代器传入reduce方法
 */
?
public class WCReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //定义一个计算器
        int count = 0;
        for (LongWritable value : values) {
            //Hadoop的基础数据类型提供了get方法,可以直接获取Java的基础数据类型
            count+= value.get();
        }
        //Hadoop的基础数据类型构造器可以通过输入Java的基础数据类型来实例化
        context.write(key,new LongWritable(count));
    }
}

 


?
package com.rzp.utils;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
?
?
import java.io.IOException;
?
public class WordCount {
?
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //通过Job这个类封装本次MR要执行的任务
        Job job = Job.getInstance(new Configuration());
        System.setProperty("HADOOP_USER_NAME","root");
?
        //指定本次jap的jar包运行的主类
        job.setJarByClass(WordCount.class);
?
        //设置Mapper相关属性
        //指定本次的Mapper类
        job.setMapperClass(WCMapper.class);
        //指定Mapper输出的 k,v的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //指定本次执行的数据来源
        //默认在hdfs文件系统的根路径下,以后可以通过main输入的args来使用
        FileInputFormat.setInputPaths(job,new Path("/words.txt"));
?
        //设置Reducer相关属性
        //指定本次的Reducer类
        job.setReducerClass(WCReducer.class);
        //指定Reducer输出的 k,v的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //指定本次执行最终结果的输出地址
        FileOutputFormat.setOutputPath(job,new Path("/wcout"));
        //job.submit();提交job,但是一般不用
        //这个方法会提交并且打印日志
        job.waitForCompletion(true);
    }
?
?
}

 


?

集群运行模式

  1. 把MapReduce程序达成jar包,提交给yarn集群,分发到节点上并发执行。

  2. 数据的处理和输出结果都位于HDFS文件系统

技术分享图片

 

 

 技术分享图片

 

 

 技术分享图片

 

 

 

 

 

 

 

hadoop jar xxxx.jar

技术分享图片

 

 

 

本地运行模式

  1. MapReduce程序是提交给LocalJobRunner在本地以单进程的形式运行

  2. 而处理的数据和输出结果可以在本地文件系统,也可以在HDFS上

  3. 要实现本地运行,要写一个程序,不要带集群的配置文件(core-site.xml要删除掉)

  4. 并且配置本地执行:

        conf.set("mapreduce.framework.name","local");

 

 
package com.rzp.utils;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
?
?
import java.io.IOException;
?
public class WordCount {
?
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //通过Job这个类封装本次MR要执行的任务
        Configuration conf = new Configuration();
        
        conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);
?
        System.setProperty("HADOOP_USER_NAME","root");
?
        //指定本次jap的jar包运行的主类
        job.setJarByClass(WordCount.class);
?
        //设置Mapper相关属性
        //指定本次的Mapper类
        job.setMapperClass(WCMapper.class);
        //指定Mapper输出的 k,v的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //指定本次执行的数据来源
        //默认在hdfs文件系统的根路径下,以后可以通过main输入的args来使用
        //和远程不同,input要指定到文件名
        FileInputFormat.setInputPaths(job,new Path("D:\\Hoptest\\input\\1.txt"));
?
        //设置Reducer相关属性
        //指定本次的Reducer类
        job.setReducerClass(WCReducer.class);
        //指定Reducer输出的 k,v的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //指定本次执行最终结果的输出地址
        FileOutputFormat.setOutputPath(job,new Path("D:\\Hoptest\\output"));
        //job.submit();提交job,但是一般不用
        //这个方法会提交并且打印日志
//        job.submit();
        job.waitForCompletion(true);
    }
?
?
}
?

 


  1. 配置本地的Hadoop库(不需完整安装,但是要有环境支持)

    下载文件

https://github.com/speedAngel/hadoop2.7.7

  1. 解压到任意路径,没有中文字符和空格

技术分享图片

 

 

 

  1. 把解压包的bin替换到解压路径

  2. 把bin中的Hadoop.dll复制到C:\Windows\System32

  3. 配置环境变量

HADOOP_HOME  D:\Environment\hadoop-2.7.7
HADOOP_CONF_DIR  D:\Environment\hadoop-2.7.7\etc\hadoop
YARN_CONF_DIR  %HADOOP_CONF_DIR%
PATH  %HADOOP_HOME%\bin

 

  1. 运行测试

技术分享图片

 

 

 

MapReduce流程

Mapper任务执行过程详解

l 第一阶段是把输入文件逻辑切片

l 第二阶段是对切片中的数据解析成<key,value>对

l 第三阶段是调用Mapper类中的map方法

l 第四阶段是按照Reducer的数量进行分区

l 第五阶段是对每个分区中的键值对进行排序

l 第六阶段是对数据进行局部聚合处理(combiner)。

Reducer任务执行过程详解

l 第一阶段是Reducer任务会主动复制Mapper输出的键值对到本地。

l 第二阶段合并复制后的数据并排序

l 第三阶段调用reduce方法

Reducer分区

//分两个区
job.setNumReduceTasks(2);

 

key.hashcode % NumReduceTasks(2)

 

Hadoop基本数据类型

BooleanWritable
ByteWritable
DoubleWritable
FloatWritable
IntWritable
LongWritable
Text
NullWritable
ArrayWritable--Writable类型的数组

 

Hadoop引用类型

自定义引用类有以下几个要求:

MapReduce序列化机制

背景

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
1363157983019    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

 

思路

  1. map负责收集数据:收集手机号和流量

  2. 最终要统计的流量是3个值:上行流量,下行流量,总流量。这个时候我们可以创建一个类,里面存这3个值。

  3. Reduce负责统计。

  4. 这个时候就有一个问题:我们在第一个程序中传递的值是类型是Hadoop封装的序列化类型,比如String,我们使用的是Text。

  5. 而我们自定义的类如果要传递,也要实现Hadoop的序列化机制。因此引出了MapReduce的序列化机制。

定义

1.  public interface Writable {  
   //序列化方法
2.  void write(DataOutput out) throws IOException;  
   //反序列化方法
3.  void readFields(DataInput in) throws IOException;  
4. }
?

程序示例

package com.rzp.pojo;
?
import org.apache.hadoop.io.Writable;
?
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
?
?
//继承Hadoop序列化接口
//省略了get/set和有参无参构造器
public class Flowbean implements Writable {
    
    private long upFlow;//上行
    private long downFlow;//下行
    private long sumFlow;//
?
    //重写Writable的序列化方法
    public void write(DataOutput out) throws IOException {
        //使用Hadoop封装的方法把基本数据类型序列化
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
?
    //重写Writable的反序列化方法
    public void readFields(DataInput in) throws IOException {
        //注意反序列化的顺序一定要和序列化一样
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
?
    }
?
    //自定义构造器
    public Flowbean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
?
    //增加一个同时设置3个属性的方法,免去map方法中set3次,减少代码量
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
?
    @Override
    public String toString() {
        return upFlow+"\t"+downFlow+"\t"+sumFlow+"\t";
    }
}

 


?
package com.rzp.flosum;
?
import com.rzp.pojo.Flowbean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
?
import java.io.IOException;
?
/**
 * 在mr程序中,可以把自定义类作为mr的数据类型
 * 但是自定义类一定要实现Hadoop序列化接口
 */
public class FlowSumMapper extends Mapper<LongWritable, Text,Text, Flowbean> {
?
    //提高作用域,先new一个出来,重复使用,避免造成大量的内存使用
    Text k = new Text();
    Flowbean v = new Flowbean();
?
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //数据使用制表符分割的
        String[] fields = line.split("\t");
        String phoneNum = fields[1];
        //因为部分数据前面少了一个域名,有些流量是第7位,有些是第6位
        //因此我们反过来获取,倒数第3个和倒数第2个
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
?
        k.set(phoneNum);
        v.set(upFlow,downFlow);
?
        context.write(k,v);
?
?
    }
}

 


?
package com.rzp.flosum;
?
import com.rzp.pojo.Flowbean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
?
import java.io.IOException;
?
public class FlowSumReducer extends Reducer<Text, Flowbean,Text,Flowbean> {
?
    Flowbean v = new Flowbean();
?
    @Override
    protected void reduce(Text key, Iterable<Flowbean> values, Context context) throws IOException, InterruptedException {
        long upFlowCount = 0;
        long downFlowCount = 0;
        
        //统计流量
        for (Flowbean value : values) {
            upFlowCount += value.getUpFlow();
            downFlowCount += value.getDownFlow();
?
        }
        //写到Flowbean对象中
        v.set(upFlowCount,downFlowCount);
        
        //输出
        context.write(key,v);
    }
}
?

 

package com.rzp.flosum;
?
import com.rzp.pojo.Flowbean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
?
import java.io.IOException;
?
public class FlowSumDrive {
?
    public static void main(String[] args) throws Exception {
        //通过Job这个类封装本次MR要执行的任务
        Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);
?
        System.setProperty("HADOOP_USER_NAME","root");
?
        //指定本次jap的jar包运行的主类
        job.setJarByClass(FlowSumDrive.class);
?
        //设置Mapper相关属性
        //指定本次的Mapper类
        job.setMapperClass(FlowSumMapper.class);
        //指定Mapper输出的 k,v的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Flowbean.class);
        //指定本次执行的数据来源
        //默认在hdfs文件系统的根路径下,以后可以通过main输入的args来使用
        FileInputFormat.setInputPaths(job,new Path("D:\\Hoptest\\flowsum\\input\\1.dat"));
?
        //设置Reducer相关属性
        //指定本次的Reducer类
        job.setReducerClass(FlowSumReducer.class);
        //指定Reducer输出的 k,v的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flowbean.class);
        //指定本次执行最终结果的输出地址
        FileOutputFormat.setOutputPath(job,new Path("D:\\Hoptest\\flowsum\\output"));
?
        job.waitForCompletion(true);
?
    }
}

 


?

执行结果

技术分享图片

 

 

 

MapReduce排序初步

在上一个结果的基础上,要实现按总流量倒序排序的需求。

思路

  1. map输入的内容就是上一个统计的结果。

  2. MapReduce中想要实现排序,就只能是在key上排序,实际上上一个结果可以看到就是按照手机号升序排序的。

  3. MapReduce中对手机号排序的方法其实就是调用了compareTo方法。

  4. 因此我们要自定义排序,首先要把Flowbean作为key,并且重写Flowbean的排序方法即可

  5. 一定要先排序grouping需要的字段

程序示例

    //重写compareTo方法
    public int compareTo(Flowbean o) {
        //o1.compareTo(o2)
        //o1大于o2的时候,o1取-1排在前面
        return this.sumFlow > o.getSumFlow()?-1:1;
    }

 

package com.rzp.flosum2;

import com.rzp.pojo.Flowbean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


//输出的key要使用Flowbean,而手机号作为value,那么MapReduce就会自动按多态调用子类的compareTo方法
public class FlowSumMapper2 extends Mapper<LongWritable, Text,Flowbean,Text> {
    Text v = new Text();
    Flowbean k = new Flowbean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        String phoneNum = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);
        v.set(phoneNum);
        k.set(upFlow,downFlow);
        context.write(k,v);
    }
}

 

package com.rzp.flosum2;

import com.rzp.pojo.Flowbean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowSumReducer2 extends Reducer<Flowbean, Text,Text,Flowbean> {

    @Override
    protected void reduce(Flowbean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //只有一行数据,只要反过来就可以了
        context.write(values.iterator().next(),key);
    }
}

 

package com.rzp.flosum2;

import com.rzp.pojo.Flowbean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSumDrive2 {

    public static void main(String[] args) throws Exception {
        //通过Job这个类封装本次MR要执行的任务
        Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);

        System.setProperty("HADOOP_USER_NAME","root");

        //指定本次jap的jar包运行的主类
        job.setJarByClass(FlowSumDrive2.class);

        //设置Mapper相关属性
        //指定本次的Mapper类
        job.setMapperClass(FlowSumMapper2.class);
        //指定Mapper输出的 k,v的类型
        job.setMapOutputKeyClass(Flowbean.class);
        job.setMapOutputValueClass(Text.class);
        //指定本次执行的数据来源
        //默认在hdfs文件系统的根路径下,以后可以通过main输入的args来使用
        FileInputFormat.setInputPaths(job,new Path("D:\\Hoptest\\flowsum\\output\\part-r-00000"));

        //设置Reducer相关属性
        //指定本次的Reducer类
        job.setReducerClass(FlowSumReducer2.class);
        //指定Reducer输出的 k,v的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flowbean.class);
        //指定本次执行最终结果的输出地址
        FileOutputFormat.setOutputPath(job,new Path("D:\\Hoptest\\flowsum\\output2"));

        job.waitForCompletion(true);

    }
}
 

 

原文:https://www.cnblogs.com/renzhongpei/p/12635209.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!