Hadoop(十一):组合任务概述和格式

时间:2020-04-05 02:26:08   收藏:0   阅读:74

组合任务概述

顺序组合式MapReduce任务

package com.rzp.linemr;
?
?
import org.apache.hadoop.mapreduce.Job;
?
import java.io.IOException;
?
//测试组合mr任务的案例:wordcount案例的输出结果是按照keyword字典排序进行输出,修改成按出现次数排序
public class Demo1 {
?
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        //按顺序创建job1,job2...
        Job job1 = createJobByTimes(1);
        Job job2 = createJobByTimes(2);
        //开始执行,这里执行顺序一定要和组合MR的Job的执行顺序一直
?
        runJob(job1,1);
        runJob(job2,2);
        System.out.println("Job执行成功");
    }
?
    //执行Job,执行失败抛出异常
    public static void runJob(Job job,int times) throws InterruptedException, IOException, ClassNotFoundException {
        if (!job.waitForCompletion(true)){
            throw new RuntimeException("第"+times+"个job执行失败");
        }
    }
?
    /**
     * 创建job根据给定的参数times
     * times = 组合式MR任务中第几个job
     */
?
    public static Job createJobByTimes(int times){
        //和普通MR的Job创建一样,从InputFormant开始到OutputFormat给定
?
        //TODO 创建Job
        return null;
    }
}

 


?

依赖关系组合式

package com.rzp.linemr;
?
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job;
?
import java.io.IOException;
?
public class Demo2 {
?
    public static void main(String[] args) throws IOException {
        //调用createControlledJob()把mapreduce.Job(普通的job)转换为可控制的ControlledJob对象
        ControlledJob job1 = createControlledJob(createJobByTimes(1));
        ControlledJob job2 = createControlledJob(createJobByTimes(2));
        ControlledJob job3 = createControlledJob(createJobByTimes(3));
        //指定依赖关系--job3依赖于job1和job2
        //addDependinJob会返回Boolean,可以用于验证
        job3.addDependingJob(job1);
        job3.addDependingJob(job2);
        //开始创建job的执行流
        JobControl jc = new JobControl("测试依赖关系组合式");
        //添加job,没有顺序
        jc.addJob(job1);
        jc.addJob(job2);
        jc.addJob(job3);
        //总的job个数
        int totalSize = jc.getReadyJobsList().size();
        //开始执行job流
        //因为继承了Runnable接口,可以直接调用run方法
        //jc.run();
        //更推荐使用Thread来执行
?
        boolean succeeded = false; //job执行流是否成功执行的标志位
        try{
            new Thread(jc).start();
            while (!jc.allFinished()){
                //没有执行完,继续进行
                try {
                    Thread.sleep(30000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            //停止执行
            jc.stop();
            if(jc.allFinished()&&jc.getSuccessfulJobList().size() == totalSize){
                //全部执行完,而且执行成功job个数和总的job个数相等,那么任务job执行陈工
                succeeded = true;
            }
        }
        System.out.println("job执行"+(succeeded?"成功":"失败"));
    }
?
    //把mapreduce.Job(普通的job)转换为可控制的ControlledJob对象
    public static ControlledJob createControlledJob (Job job) throws IOException {
        ControlledJob cj = new ControlledJob(job.getConfiguration());
        cj.setJob(job);//惊醒设置
        return cj;
    }
?
?
    //和普通MR的Job创建一样,从InputFormant开始到OutputFormat给定
    public static Job createJobByTimes(int times){
       //TODO 创建Job
        return null;
    }
}
?

 

链式MapReduce

package com.rzp.linemr;
?
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
?
import java.io.IOException;
?
public class Demo3 {
?
    public static void main(String[] args) throws Exception {
        Configuration conf1 = new Configuration();
        Job job1 = Job.getInstance(conf1,"job1");
        job1.setJarByClass(Demo3.class);
        FileInputFormat.addInputPath(job1,new Path(""));
?
        /**
         * 设置mapper
         * klass 对应mapper类
         * K1, V1, K2, V2---klass对应的输入、输出的类型
         * mapperConf mapper使用的编写信息
         */
?
        //添加第1个mapper
        ChainMapper.addMapper(JobConf job,
                Class<? extends Mapper<K1, V1, K2, V2>> klass,
                Class<? extends K1> inputKeyClass,
                Class<? extends V1> inputValueClass,
                Class<? extends K2> outputKeyClass,
                Class<? extends V2> outputValueClass,
        boolean byValue, JobConf mapperConf);
?
        //添加第2个mapper
        ChainMapper.addMapper(JobConf job,
                Class<? extends Mapper<K1, V1, K2, V2>> klass,
                Class<? extends K1> inputKeyClass,
                Class<? extends V1> inputValueClass,
                Class<? extends K2> outputKeyClass,
                Class<? extends V2> outputValueClass,
        boolean byValue, JobConf mapperConf);
?
        //添加reducer
        //输入值和上面的一样
        ChainReducer.setReducer(...);
?
        //添加reducer后续Mapper
        //格式也和上面的一样
        //注意reducer
        ChainReducer.addMapper(...);
?
        //设置总的输入输出路径
        job1.setJarByClass(Demo3.class);
        //TODO 添加map和reduce的输出key和value的类型
    }
}
?

 

 

原文:https://www.cnblogs.com/renzhongpei/p/12635278.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!