Hadoop Secondary sort - to use or not use

292 views Asked by At

I have accidents input data from Traffic Data Analysis . Some of the columns are :

Accident Id, Accident Date, Day of week

1, 1/1/1979, 5 (Thursday)

2, 1/2/1979, 6 (Friday)

.......

3, 1/1/1980, 0 (Sunday)

I am trying to solve following :

Find number of accidents per year per day

so output should look like :

where Key is (Year, Day of week)

and Value= Number of accidents on that day Here line 1 represents , year =1979 Day = Sunday and number of accidents =500 and so on.

1979,1     500

1979,2    1500

1979,3    2500

1979,4    3500

1979,5    4500

1979,6    5500

1979,7    6500

1980,1     500

1980,2    1500

1980,3    2500

1980,4    3500

1980,5    4500

In this scenario , I am trying to solve it using secondary sort method . Is that correct way to solve this problem ?

If secondary sort is correct way , its not working for me . Here is the key class, mapper and reducer. But my output doesn't come as expected . Please help ..

public class DOW implements WritableComparable<DOW> {
    private Text year;
    private Text day;

    // private final Text count;

    // private int count;
    public DOW() {
        this.year = new Text();
        this.day = new Text();
        // this.count = count;
    }

    public DOW(Text year, Text day) {
        this.year = year;
        this.day = day;
        // this.count = count;
    }

    public Text getYear() {
        return this.year;
    }

    public void setYear(Text year) {
        this.year = year;
    }

    public Text getDay() {
        return this.day;
    }

    public void setDay(Text day) {
        this.day = day;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        year.readFields(in);
        day.readFields(in);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        year.write(out);
        day.write(out);
    }

    @Override
    public int compareTo(DOW o) {
        // TODO Auto-generated method stub
        int cmp = year.compareTo(o.year);
        if (cmp != 0) {
            return cmp;
        }
        return o.day.compareTo(this.day);
    }

    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return year + "," + day;
    }

    @Override
    public boolean equals(Object o) {
        // TODO Auto-generated method stub
        if (o instanceof DOW) {
            DOW tp = (DOW) o;
            return year.equals(tp.year) && day.equals(tp.day);
        }
        return false;
    }

    @Override
    public int hashCode() {
        // TODO Auto-generated method stub
        return year.hashCode() * 163 + day.hashCode();
    }
}
public class AccidentDowDemo extends Configured implements Tool {

    public static class DOWMapper extends Mapper<LongWritable, Text, DOW, IntWritable> {
        private static final Logger sLogger = Logger.getLogger(DOWMapper.class);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {

            if (value.toString().contains(",")) {
                String[] array = value.toString().split(",");
                if (!array[9].equals("Date")) {
                    Date dt = null;
                    try {
                        dt = new SimpleDateFormat("dd/mm/yyyy").parse(array[9]);

                    } catch (ParseException e) {
                        // TODO Auto-generated catch block

                        e.printStackTrace();
                    }

                    int year = dt.getYear();

                    int day = Integer.parseInt(array[10].toString());
                                        context.write(new DOW(new Text(Integer.toString(year)),
                            new Text(Integer.toString(day))),
                            new IntWritable(1));
                }
            }
        };
    }

    public static class DOWReducer extends Reducer<DOW, IntWritable, DOW, IntWritable> {
        private static final Logger sLogger = Logger
                .getLogger(DOWReducer.class);

        @Override
        protected void reduce(DOW key, Iterable<IntWritable> values,
                Context context) throws java.io.IOException,
                InterruptedException {
            int count = 0;
            sLogger.info("key =" + key);
            for (IntWritable x : values) {
                int val = Integer.parseInt(x.toString());
                count = count + val;
            }
            context.write(key, new IntWritable(count));
        };
    }

    public static class FirstPartitioner extends Partitioner<DOW, IntWritable> {

        @Override
        public int getPartition(DOW key, IntWritable value, int numPartitions) {
            // TODO Auto-generated method stub

            return Math.abs(Integer.parseInt(key.getYear().toString()) * 127)
                    % numPartitions;
        }
    }

    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(DOW.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            // TODO Auto-generated method stub

            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            int cmp = ip1.getYear().compareTo(ip2.getYear());
            if (cmp == 0) {
                cmp = -1 * ip1.getDay().compareTo(ip2.getDay());
            }
            return cmp;
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(DOW.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {

            // TODO Auto-generated method stub
            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            return ip1.getYear().compareTo(ip2.getYear());
        }
    }
}
2

There are 2 answers

3
alexeipab On BEST ANSWER

If you need to basically simulate

select year, day, count(*) as totalPerDay from DATA group by year, day

than you do not need secondary sort.

But if you need to produce something like a CUBE, where you need to calculate total per year and total per week in one MR job, than secondary sort is the way to go.

9
Vignesh I On

It is more or less kind of a secondary sorting but is not. The problem is with GroupComparator, the comparison has to be done both on year and day. The idea of groupcomparator is to make sure that the same year does go into same reducer but here we don't need that,instead the data has to go into same reducer if it has same year and same day(1979 and sunday). It should look something like this.

package accidentexercise;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ClassGroupComparator extends WritableComparator
{
    protected ClassGroupComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp= s.year.compareTo(s1.year);

            if(cmp==0)
            {
                cmp= -1*s.day.compareTo(s1.day);
            }
            return cmp;
    }
}

I am pasting my whole code as well.

TextpairWritable:

package accidentexercise;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextpairWritable implements WritableComparable<TextpairWritable>
{
    Text year=new Text();
    Text day=new Text();
    public TextpairWritable()
    {
        this.year=new Text();
        this.day=new Text();
    }
    public TextpairWritable(Text year,Text day)
    {
        this.year=year;
        this.day=day;
    }

    public TextpairWritable(String year,String day)
    {
        this.year=new Text(year);
        this.day=new Text(day);
    }
    public TextpairWritable(TextpairWritable o)
    {
        this.year=o.year;
        this.day=o.day;
    }
    public void set(Text year,Text day)
    {
        this.year=year;
        this.day=day;
    }
    public Text getyear()
    {
        return this.year;
    }
    public Text getday()
    {
        return this.day;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        year.readFields(in);
        day.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        year.write(out);
        day.write(out);
    }

    public String toString()
    {
        return year+" "+day;
    }

    public int compareTo(TextpairWritable o)
    {
        int cmp=year.compareTo(day);
        if(cmp==0)
        {
            cmp=day.compareTo(day);
        }
        return cmp;
    }
}

GroupComparator:


package accidentexercise;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ClassGroupComparator extends WritableComparator
{
    protected ClassGroupComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp= s.year.compareTo(s1.year);

            if(cmp==0)
            {
                cmp= -1*s.day.compareTo(s1.day);
            }
            return cmp;
    }
}

SortComparator:
package accidentexercise;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ClassSortComparator extends WritableComparator
{
    protected ClassSortComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp=s.year.compareTo(s1.year);
        if(cmp==0)
        {
            cmp= -1*s.day.compareTo(s1.day);
        }
        return cmp;
    }

}
Mapper:
package accidentexercise;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClassMapper extends Mapper<LongWritable,Text,TextpairWritable,IntWritable>
{
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
    {

        Logger log=LoggerFactory.getLogger(ClassMapper.class) ;
        String s=value.toString();
        String[] orig_data=s.split(",");

        SimpleDateFormat df=new SimpleDateFormat("dd/MM/yyyy");
        df.setLenient(false);
        try
        {
            @SuppressWarnings("unused")
            Date date=df.parse(orig_data[0]);
            String myyear=orig_data[0].substring(6, 10);
            context.write(new TextpairWritable(new Text(myyear),new Text(orig_data[2])),new IntWritable(Integer.parseInt(orig_data[1])));
        }
        catch(ParseException e)
        {

            log.info("Date is not correct"+e);
        }
    }
}
Reducer:
package accidentexercise;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class ClassReducer extends Reducer<TextpairWritable,IntWritable,TextpairWritable,IntWritable>
{
    public void reduce(TextpairWritable key,Iterable<IntWritable> value,Context context) throws IOException,InterruptedException
    {
        int count=0;
        for(IntWritable it:value)
        {
            count+=it.get();
        }
        context.write(key,new IntWritable(count));
    }

}
Driver:
package accidentexercise;


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class ClassDriver {
    public static void main(String args[]) throws Exception
    {
        if(args.length!=2)
            {
            System.err.println("Usage: Worddrivernewapi <input path> <output path>");
            System.exit(-1);
            }
        Job job=new Job();

        job.setJarByClass(ClassDriver.class);
        job.setJobName("MyDriver");

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(ClassMapper.class);
        job.setPartitionerClass(ClassPartitioner.class);
        job.setSortComparatorClass(ClassSortComparator.class);
        job.setGroupingComparatorClass(ClassGroupComparator.class);
        job.setReducerClass(ClassReducer.class);
        //job.setNumReduceTasks(0);

        job.setOutputKeyClass(TextpairWritable.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}
Partitioner:
package accidentexercise;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Partitioner;

public class ClassPartitioner extends Partitioner<TextpairWritable,IntWritable>
{

    @Override
    public int getPartition(TextpairWritable tp, IntWritable value, int numPartitions) {

        return Math.abs(Integer.parseInt(tp.getyear().toString()) * 127) % numPartitions;   
    }


}

Sample Input:

Date,Number_of_accidents,day

01/03/2014,18,2

02/03/2014,19,3

03/03/2014,20,4

01/03/2014,1,2

02/03/2014,2,3

03/03/2014,4,4

01/03/2014,8,2

02/03/2014,9,3

03/03/2014,2,4

Output:

01/03/2014,2,27

02/03/2014,3,30

03/03/2014,4,26