I'm trying to sort the data by value
The method i use is to combine the key and value to a composite key
e.g (key,value) -> ({key,value},value)
and define my KeyComaparator which is compare the value part in the key
my data is a paragraph that i should count the words
and i done two job, the first one do the wordCount, but combine the key to composite key in reducer.
this is the result
is,4 4
the,15 15
ECA,1 1
to,6 6
.....
and in the second job, I try to use the composite key to sort by the value
this is my mapper2
public static class Map2 extends MapReduceBase
implements Mapper<LongWritable,Text,Text,IntWritable>{
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String w1[] = line.split("\t");
word.set(w1[0]);
output.collect(word,new IntWritable(Integer.valueOf(w1[1])));
}
}
and here is my Keycomparator
public static final class KeyComparator extends WritableComparator {
public KeyComparator(){
super(Text.class,true);
}
@Override
public int compare(WritableComparable tp1, WritableComparable tp2) {
Text t1 = (Text)tp1;
Text t2 = (Text)tp2;
String a[] = t1.toString().split(",");
String b[] = t2.toString().split(",");
return a[1].compareTo(b[1]);
}
this is my reducer2
public static class Reduce2 extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
int sum=0;
while( values.hasNext()){
sum+= values.next().get();
}
//String cpKey[] = key.toString().split(",");
Text outputKey = new Text();
//outputKey.set(cpKey[0]);
output.collect(key, new IntWritable(sum));
}
}
here is my main function
public static void main(String[] args) throws Exception {
int reduceTasks = 1;
int mapTasks = 3;
System.out.println("1. New JobConf...");
JobConf conf = new JobConf(WordCountV2.class);
conf.setJobName("WordCount");
System.out.println("2. Setting output key and value...");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
System.out.println("3. Setting Mapper and Reducer classes...");
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
// set numbers of reducers
System.out.println("4. Setting number of reduce and map tasks...");
conf.setNumReduceTasks(reduceTasks);
conf.setNumMapTasks(mapTasks);
System.out.println("5. Setting input and output formats...");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
System.out.println("6. Setting input and output paths...");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
String TempDir = "temp" + Integer.toString(new Random().nextInt(1000)+1);
FileOutputFormat.setOutputPath(conf, new Path(TempDir));
//FileOutputFormat.setOutputPath(conf,new Path(args[1]));
System.out.println("7. Running job...");
JobClient.runJob(conf);
JobConf sort = new JobConf(WordCountV2.class);
sort.setJobName("sort");
sort.setMapOutputKeyClass(Text.class);
sort.setMapOutputValueClass(IntWritable.class);
sort.setOutputKeyComparatorClass(KeyComparator.class);
sort.setMapperClass(Map2.class);
sort.setReducerClass(Reduce2.class);
sort.setNumReduceTasks(reduceTasks);
sort.setNumMapTasks(mapTasks);
sort.setInputFormat(TextInputFormat.class);
sort.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(sort,TempDir);
FileOutputFormat.setOutputPath(sort, new Path(args[1]));
JobClient.runJob(sort);
}
but the result is kind of this
is 13
the 32
ECA 21
to 14
. . .
and lost many word
but if i didn't use my Keycomparator
it returns to the result which is not sorted, just like the first one i mentioned
any ideas to solve the problem? thanks!
I'm not sure where you are making mistake.
But what you are trying to do is called
Secondary Sort
Sorting based on value.It's not a trivial job to do, but you need to create more classes for patition,aggregation and other stuff which is clearly explained Here and Here
Just following the instructions in those blogs will surely help you.