Implementation for CombineFileInputFormat Hadoop 0.20.205

5.2k views Asked by At

Can someone please point out where I could find an implementation for CombineFileInputFormat (org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.

It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.

Edit: I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.

1

There are 1 answers

8
Amar On BEST ANSWER

Here is an implementation I have for you:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

        return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
    }

    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
        private final LineRecordReader linerecord;

        public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
            FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
            linerecord = new LineRecordReader(conf, filesplit);
        }

        @Override
        public void close() throws IOException {
            linerecord.close();

        }

        @Override
        public LongWritable createKey() {
            // TODO Auto-generated method stub
            return linerecord.createKey();
        }

        @Override
        public Text createValue() {
            // TODO Auto-generated method stub
            return linerecord.createValue();
        }

        @Override
        public long getPos() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getPos();
        }

        @Override
        public float getProgress() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getProgress();
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {

            // TODO Auto-generated method stub
            return linerecord.next(key, value);
        }

    }
}

In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
            if (argument != null) {
                conf.set("mapred.max.split.size", argument);
            } else {
                conf.set("mapred.max.split.size", "134217728"); // 128 MB
            }
...

            conf.setInputFormat(CombinedInputFormat.class);
...