I'm trying to do a secondary sort in mapreduce with a composite key that consisnts of:
String natural-key = program name
Long key-for-sorting = time in milli since 1970
The problem is that After sorting I get lots of reducers according to the entire composite key
By debugging I have verified that the hashcode and the compare functions are correct. From debug logging it where each block is from a different reducer it shows that either the grouping or the partitioning didn't succeed. from debug logs:
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=the voice
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:03 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:03 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key the voice ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=top gear
14/12/14 00:55:12 INFO popularitweet.EtanReducer: top gear: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key top gear ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=american horror story
14/12/14 00:55:12 INFO popularitweet.EtanReducer: american horror story: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key american horror story ended
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key=the voice
14/12/14 00:55:12 INFO popularitweet.EtanReducer: the voice: Thu Dec 11 17:51:04 +0000 2014
14/12/14 00:55:12 INFO popularitweet.EtanReducer: key the voice ended
As you can see the voice is sent to two different reducers but the timestamp is different. Any help would be appreciated. The composite key is the following class:
public class ProgramKey implements WritableComparable<ProgramKey> {
private String program;
private Long timestamp;
public ProgramKey() {
}
public ProgramKey(String program, Long timestamp) {
this.program = program;
this.timestamp = timestamp;
}
@Override
public int compareTo(ProgramKey o) {
int result = program.compareTo(o.program);
if (result == 0) {
result = timestamp.compareTo(o.timestamp);
}
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, program);
dataOutput.writeLong(timestamp);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
program = WritableUtils.readString(dataInput);
timestamp = dataInput.readLong();
}
My implemeted Partitioner, GroupingComparator, and SortingComparator are these:
public class ProgramKeyPartitioner extends Partitioner<ProgramKey, TweetObject> {
@Override
public int getPartition(ProgramKey programKey, TweetObject tweetObject, int numPartitions) {
int hash = programKey.getProgram().hashCode();
int partition = hash % numPartitions;
return partition;
}
}
public class ProgramKeyGroupingComparator extends WritableComparator {
protected ProgramKeyGroupingComparator() {
super(ProgramKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ProgramKey k1 = (ProgramKey) a;
ProgramKey k2 = (ProgramKey) b;
return k1.getProgram().compareTo(k2.getProgram());
}
}
public class TimeStampComparator extends WritableComparator {
protected TimeStampComparator() {
super(ProgramKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ProgramKey ts1 = (ProgramKey)a;
ProgramKey ts2 = (ProgramKey)a;
int result = ts1.getProgram().compareTo(ts2.getProgram());
if (result == 0) {
result = ts1.getTimestamp().compareTo(ts2.getTimestamp());
}
return result;
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// Create configuration
Configuration conf = new Configuration();
// Create job
Job job = new Job(conf, "test1");
job.setJarByClass(EtanMapReduce.class);
// Set partitioner keyComparator and groupComparator
job.setPartitionerClass(ProgramKeyPartitioner.class);
job.setGroupingComparatorClass(ProgramKeyGroupingComparator.class);
job.setSortComparatorClass(TimeStampComparator.class);
// Setup MapReduce
job.setMapperClass(EtanMapper.class);
job.setMapOutputKeyClass(ProgramKey.class);
job.setMapOutputValueClass(TweetObject.class);
job.setReducerClass(EtanReducer.class);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TweetObject.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
logger.info("starting job");
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
Edit...
your TimeStampComparator seems to have a typo... you're setting ts2 to a when it should be set to b:
when it should be:
This would result in incorrectly sorted key/value pairs and invalidates the assumption made by the grouping comparator that the key/value pairs are sorted.
Check also that the original program names are in UTF-8 as that's what WritableUtils assumes. Is your system's default code page also UTF-8?