Extend SequenceFileInputFormat to include file name+offset

1.3k views Asked by At

I would like to be able to create a custom InputFormat that reads sequence files, but additionally exposes the file path and offset within that file where the record is located.

To take a step back, here's the use case: I have a sequence file containing variably-sized data. The keys are mostly irrelevant, and the values are up to a couple megabytes containing a variety of different fields. I would like to index some of these fields in elasticsearch along with the file name and offset. This way, I can query those fields from elasticsearch, and then use the file name and offset to go back to the sequence file and obtain the original record, instead of storing the whole thing in ES.

I have this whole process working as a single java program. The SequenceFile.Reader class conveniently gives getPosition and seek methods to make this happen.

However, there will eventually be many terabytes of data involved, so I will need to convert this to a MapReduce job (probably Map-only). Since the actual keys in the sequence file are irrelevant, the approach I had hoped to take would be to create a custom InputFormat that extends or somehow utilizes the SquenceFileInputFormat, but instead of returning the actual keys, instead returns a composite key consisting of the file and offset.

However, that's proving to be more difficult in practice. It seems like it should be possible, but given the actual APIs and what's exposed, it's tricky. Any ideas? Maybe an alternative approach I should take?

1

There are 1 answers

0
Joe K On BEST ANSWER

In case anyone encounters a similar problem, here's the solution I came up with. I ended up simply duplicating some of the code in SequenceFileInputFormat/RecordReader and just modifying it. I had hoped to write either a subclass or a decorator or something... this way is not pretty, but it works:

SequenceFileOffsetInputFormat.java:

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {

        private SequenceFile.Reader in;
        private long start;
        private long end;
        private boolean more = true;
        private PathOffsetWritable key = null;
        private Writable k = null;
        private V value = null;
        private Configuration conf;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) split;
            conf = context.getConfiguration();
            Path path = fileSplit.getPath();
            FileSystem fs = path.getFileSystem(conf);
            this.in = new SequenceFile.Reader(fs, path, conf);
            try {
                this.k = (Writable) in.getKeyClass().newInstance();
                this.value = (V) in.getValueClass().newInstance();
            } catch (InstantiationException e) {
                throw new IOException(e);
            } catch (IllegalAccessException e) {
                throw new IOException(e);
            }
            this.end = fileSplit.getStart() + fileSplit.getLength();

            if (fileSplit.getStart() > in.getPosition()) {
                in.sync(fileSplit.getStart());
            }

            this.start = in.getPosition();
            more = start < end;

            key = new PathOffsetWritable(path, start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!more) {
                return false;
            }
            long pos = in.getPosition();

            more = in.next(k, value);
            if (!more || (pos >= end && in.syncSeen())) {
                key = null;
                value = null;
                more = false;
            } else {
                key.setOffset(pos);
            }
            return more;
        }

        @Override
        public PathOffsetWritable getCurrentKey() {
            return key;
        }

        @Override
        public V getCurrentValue() {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
            }
        }

        @Override
        public void close() throws IOException {
            in.close();
        }

    }

    @Override
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new SequenceFileOffsetRecordReader<V>();
    }

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
    }

    @Override
    public long getFormatMinSplitSize() {
        return SequenceFile.SYNC_INTERVAL;
    }


}

PathOffsetWritable.java:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {

    private Text t = new Text();
    private Path path;
    private long offset;

    public PathOffsetWritable(Path path, long offset) {
        this.path = path;
        this.offset = offset;
    }

    public Path getPath() {
        return path;
    }

    public long getOffset() {
        return offset;
    }

    public void setPath(Path path) {
        this.path = path;
    }

    public void setOffset(long offset) {
        this.offset = offset;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        t.readFields(in);
        path = new Path(t.toString());
        offset = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        t.set(path.toString());
        t.write(out);
        out.writeLong(offset);
    }

    @Override
    public int compareTo(PathOffsetWritable o) {
        int x = path.compareTo(o.path);
        if (x != 0) {
            return x;
        } else {
            return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
        }
    }


}