output of one mapreduce program as input to another mapreduce program

2.1k views Asked by At

I am trying a simple example, in which the output of one MapReduce job should be the input of another MapReduce job.

The flow should be like this: Mapper1 --> Reducer1 --> Mapper2 --> Reducer2 (The output of Mapper1 must be the input of Reducer1. The output of Reducer1 must be the input of Mapper2. The output of Mapper2 must be the input of Reducer2. The output of Reducer2 must be stored in output file).

How can I add multiple Mappers and Reducers to my program such that the flow is maintained like above?

Do I need to use Chain Mappers or Chain Reducers? If so how can I use them?

2

There are 2 answers

2
Fabian Hueske On

You need to implement two separate MapReduce jobs for that. The result of the first job needs to be written to some persistent storage (like HDFS) and will be read by the second job. The SequenceOutputFormat/InputFormat is often used for that. Both MapReduce jobs can be executed from the same driver program.

5
ViKiG On

I guess what you are looking for is ControlledJob and JobControl. It aptly fits your purpose. In a single Driver class you can build multiple jobs which have dependencies on each other. Following code might help you understand.

    Job jobOne = Job(jobOneConf, "Job-1");
    FileInputFormat.addInputPath(jobOne, jobOneInput);
    FileOutputFormat.setOutputPath(jobOne, jobOneOutput);
    ControlledJob jobOneControl = new ControlledJob(jobOneConf);
    jobOneControl.setJob(jobOne);

    Job jobTwo = Job(jobTwoConf, "Job-2");
    FileInputFormat.addInputPath(jobTwo, jobOneOutput); // here we set the job-1's output as job-2's input
    FileOutputFormat.setOutputPath(jobTwo, jobTwoOutput); // final output
    ControlledJob jobTwoControl = new ControlledJob(jobTwoConf);
    jobTwoControl.setJob(jobTwo);

    JobControl jobControl = new JobControl("job-control");
    jobControl.add(jobOneControl);
    jobControl.add(jobTwoControl);
    jobTwoControl.addDependingJob(jobOneControl); // this condition makes the job-2 wait until job-1 is done

    Thread jobControlThread = new Thread(jobControl);
    jobControlThread.start();
    jobControlThread.join(); 

    /* The jobControl.allFinished() can also be used to wait until all jobs are done */