Input of the reduce phase is not what I expect in Hadoop (Java)

71 views Asked by At

I'm working on a very simple graph analysis tool in Hadoop using MapReduce. I have a graph that looks like the following (each row represents and edge - in fact, this is a triangle graph):

1 3
3 1
3 2
2 3

Now, I want to use MapReduce to count the triangles in this graph (obviously one). It is still work in progress and in the first phase, I try to get a list of all neighbors for each vertex.

My main class looks like the following:

public class TriangleCount {

    public static void main( String[] args ) throws Exception {

        // remove the old output directory
        FileSystem fs = FileSystem.get(new Configuration());
        fs.delete(new Path("output/"), true);

        JobConf firstPhaseJob = new JobConf(FirstPhase.class);

        firstPhaseJob.setOutputKeyClass(IntWritable.class);
        firstPhaseJob.setOutputValueClass(IntWritable.class);

        firstPhaseJob.setMapperClass(FirstPhase.Map.class);
        firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class);
        firstPhaseJob.setReducerClass(FirstPhase.Reduce.class);

        FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/"));
        FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/"));

        JobClient.runJob(firstPhaseJob);
    }
}

My Mapper and Reducer implementations look like this, they are both very easy:

public class FirstPhase {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {

        @Override
        public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
            StringTokenizer tokenizer = new StringTokenizer(graphLine.toString());
            int n1 = Integer.parseInt(tokenizer.nextToken());
            int n2 = Integer.parseInt(tokenizer.nextToken());
            if(n1 > n2) {
                System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")");
                outputCollector.collect(new IntWritable(n1), new IntWritable(n2));
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> {

        @Override
        public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
            List<IntWritable> nNodes = new ArrayList<>();
            while(iterator.hasNext()) {
                nNodes.add(iterator.next());
            }

            System.out.println("key: " + key + ", list: " + nNodes);

            // create pairs and emit these
            for(IntWritable n1 : nNodes) {
                for(IntWritable n2 : nNodes) {
                    outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString()));
                }
            }
        }
    }
}

I've added some logging to the program. In the map phase, I print which pairs I'm emitting. In the reduce phase, I print the input of the reduce. I get the following output:

emitting (3, 1)
emitting (3, 2)
key: 3, list: [1, 1]

The input for the reduce function is not what I expect. I expect it to be [1, 2] and not [1, 1]. I believe that Hadoop automatically combines all my emitted pairs from the output of the map phase but am I missing something here? Any help or explanation would be appreciated.

1

There are 1 answers

1
vanekjar On BEST ANSWER

This is a typical problem for people beginning with Hadoop MapReduce.

The problem is in your reducer. When looping through the given Iterator<IntWritable>, each IntWritable instance is re-used, so it only keeps one instance around at a given time.

That means when you call iterator.next() your first saved IntWritable instance is set with the new value.

You can read more about this problem here
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/