I have a MapReduce job that outputs an IntWritable as the key and Point (Object I created that implements writable) object as the value from the map function. Then in the reduce function I use a for-each loop to go through the iterable of Points to create a list:
@Override
public void reduce(IntWritable key, Iterable<Point> points, Context context) throws IOException, InterruptedException {
List<Point> pointList = new ArrayList<>();
for (Point point : points) {
pointList.add(point);
}
context.write(key, pointList);
}
The problem is that this list is then the correct size, but every Point is exactly the same. The fields in my Point class are not static and I have printed each point individually in the loop to ensure the points are unique (which they are). Furthermore, I have created a separate class that just creates a couple of points and adds them to a list, and this seems to work, which implies that MapReduce does something I am not aware of.
Any help with fixing this would be greatly appreciated.
UPDATE: Code for Mapper class:
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private IntWritable firstChar = new IntWritable();
private Point point = new Point();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
while(tokenizer.hasMoreTokens()) {
String atts = tokenizer.nextToken();
String cut = atts.substring(1, atts.length() - 1);
String[] nums = cut.split(",");
point.set(Double.parseDouble(nums[0]), Double.parseDouble(nums[1]), Double.parseDouble(nums[2]), Double.parseDouble(nums[3]));
context.write(one, point);
}
}
Point class:
public class Point implements Writable {
public Double att1;
public Double att2;
public Double att3;
public Double att4;
public Point() {
}
public void set(Double att1, Double att2, Double att3, Double att4) {
this.att1 = att1;
this.att2 = att2;
this.att3 = att3;
this.att4 = att4;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(att1);
dataOutput.writeDouble(att2);
dataOutput.writeDouble(att3);
dataOutput.writeDouble(att4);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.att1 = dataInput.readDouble();
this.att2 = dataInput.readDouble();
this.att3 = dataInput.readDouble();
this.att4 = dataInput.readDouble();
}
@Override
public String toString() {
String output = "{" + att1 + ", " + att2 + ", " + att3 + ", " + att4 + "}";
return output;
}
The problem is in your reducer. You don't want to store all the points in memory. They may be possibly big and Hadoop solves that for you (even though in an awkward way).
When looping through the given
Iterable<Points>
, eachPoint
instance is re-used, so it only keeps one instance around at a given time.That means when you call
points.next()
, these two things will happen:Point
instance is re-used and set with the next point dataKey
instance.In your case you will find in the List just one instance of the
Point
inserted multiple times and set with the data from the lastPoint
.You shouldn't save instances of
Writables
in your reducer or should clone them.You can read more about this problem here
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/