failed to set a KeyComparator function

64 views Asked by At

I'm trying to sort the data by value

The method i use is to combine the key and value to a composite key

e.g (key,value) -> ({key,value},value)

and define my KeyComaparator which is compare the value part in the key

my data is a paragraph that i should count the words

and i done two job, the first one do the wordCount, but combine the key to composite key in reducer.

this is the result

is,4 4
the,15 15
ECA,1 1
to,6 6
.....

and in the second job, I try to use the composite key to sort by the value

this is my mapper2

public static class Map2 extends MapReduceBase
    implements Mapper<LongWritable,Text,Text,IntWritable>{

            private Text word = new Text();
            public void map(LongWritable key, Text value, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
                    String line = value.toString();
                    String w1[] = line.split("\t");
                    word.set(w1[0]);
                    output.collect(word,new IntWritable(Integer.valueOf(w1[1])));
            }
    }

and here is my Keycomparator

public static final class KeyComparator extends WritableComparator {
    public KeyComparator(){
            super(Text.class,true);
    }
@Override
public int compare(WritableComparable tp1, WritableComparable tp2) {
    Text t1 = (Text)tp1;
    Text t2 = (Text)tp2;
    String a[] = t1.toString().split(",");
    String b[] = t2.toString().split(",");
    return a[1].compareTo(b[1]);


}

this is my reducer2

public static class Reduce2 extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
                    int sum=0;
            while( values.hasNext()){
                    sum+= values.next().get();
            }
            //String cpKey[] = key.toString().split(",");
            Text outputKey = new Text();
            //outputKey.set(cpKey[0]);
            output.collect(key, new IntWritable(sum));
            }

    }

here is my main function

 public static void main(String[] args) throws Exception {
            int reduceTasks = 1;
            int mapTasks = 3;

            System.out.println("1. New JobConf...");
            JobConf conf = new JobConf(WordCountV2.class);
            conf.setJobName("WordCount");

            System.out.println("2. Setting output key and value...");
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            System.out.println("3. Setting Mapper and Reducer classes...");
            conf.setMapperClass(Map.class);
            conf.setReducerClass(Reduce.class);

            // set numbers of reducers
            System.out.println("4. Setting number of reduce and map tasks...");
            conf.setNumReduceTasks(reduceTasks);
            conf.setNumMapTasks(mapTasks);

            System.out.println("5. Setting input and output formats...");
            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);


            System.out.println("6. Setting input and output paths...");
            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            String TempDir = "temp" + Integer.toString(new Random().nextInt(1000)+1);
            FileOutputFormat.setOutputPath(conf, new Path(TempDir));
            //FileOutputFormat.setOutputPath(conf,new Path(args[1]));
            System.out.println("7. Running job...");
            JobClient.runJob(conf);
            JobConf sort = new JobConf(WordCountV2.class);
            sort.setJobName("sort");
            sort.setMapOutputKeyClass(Text.class);
            sort.setMapOutputValueClass(IntWritable.class);
            sort.setOutputKeyComparatorClass(KeyComparator.class);
            sort.setMapperClass(Map2.class);
            sort.setReducerClass(Reduce2.class);
            sort.setNumReduceTasks(reduceTasks);
            sort.setNumMapTasks(mapTasks);
            sort.setInputFormat(TextInputFormat.class);
            sort.setOutputFormat(TextOutputFormat.class);
            FileInputFormat.setInputPaths(sort,TempDir);
            FileOutputFormat.setOutputPath(sort, new Path(args[1]));
            JobClient.runJob(sort);


    }

but the result is kind of this

is 13
the 32
ECA 21
to 14
. . .

and lost many word

but if i didn't use my Keycomparator

it returns to the result which is not sorted, just like the first one i mentioned

any ideas to solve the problem? thanks!

1

There are 1 answers

2
Mr.Chowdary On BEST ANSWER

I'm not sure where you are making mistake.
But what you are trying to do is called Secondary Sort Sorting based on value.
It's not a trivial job to do, but you need to create more classes for patition,aggregation and other stuff which is clearly explained Here and Here
Just following the instructions in those blogs will surely help you.