多个MapReduce前后依赖的实现方式

map1/reduce1的输出结果给map2,reduce2的输出结果给map3,以此类推,实现pipeline流。使用ControlledJob和JobControl,
可以进行前后依赖控制:cjob2.addDependingJob(cjob1);

String infile = "input1.txt"
String outfile = "output.txt"
JobConf jobconf1 = new JobConf(new Configuration());
Path in1 = new Path(infile);
Path out1 = new Path("/tmp/");
FileInputFormat.setInputPaths(jobconf1, in1);
TextOutputFormat.setOutputPath(jobconf1, out1);
jobconf1.setJobName("Job1");
jobconf1.setOutputKeyClass(Text.class);
jobconf1.setOutputValueClass(Text.class);
jobconf1.setMapperClass(MyMapper1.class);
jobconf1.setReducerClass(MyReducer1.class);
jobconf1.setOutputFormat(TextOutputFormat.class);
Job job1 =  new Job(jobconf1);

JobConf jobconf2 = new JobConf(new Configuration());
Path out2 = new Path(outfile);

//job1的输出是job2的输入
FileInputFormat.setInputPaths(jobconf2, out1);
TextOutputFormat.setOutputPath(jobconf2, out2);

jobconf2.setJobName("Job2");
jobconf2.setOutputKeyClass(Text.class);
jobconf2.setOutputValueClass(Text.class);
jobconf2.setMapperClass(MyMapper2.class);
jobconf2.setReducerClass(MyReducer2.class);
jobconf2.setOutputFormat(TextOutputFormat.class);
Job job2 =  new Job(jobconf2);

//通过ControledJob进行前后依赖关系
ControlledJob cj1 = new ControlledJob(job1, null);
ControlledJob cj2 = new ControlledJob(job2, null);
cj2.addDependingJob(cj1);
JobControl jc = new JobControl("job");
jc.addJob(cj1);
jc.addJob(cj2);
jc.run();