Hadoop RawLocalFileSystem and getPos

261 views Asked by At

I've found that the getPos in the RawLocalFileSystem's input stream can throw a null pointer exception if its underlying stream is closed.

I discovered this when playing with a custom record reader.

to patch it, I simply check if a call to "stream.available()" throws an exception, and if so, I return 0 in the getPos() function.

The existing getPos() implementation is found here:

https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java

What should be the correct behaviour of getPos() in the RecordReader?

1

There are 1 answers

0
jayunit100 On

The "getPos" in the RecordReader has changed over time.

In the old mapred RecordReader implementations, it was used to count bytes read.

  /** 
   * Returns the current position in the input.
   * 
   * @return the current position in the input.
   * @throws IOException
   */
  long getPos() throws IOException;

In the newer mapreduce RecordReader implementations, this information is not provided by the RR class, but rather, it is part of the FSInputStream implementations:

class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
private FileInputStream fis;
private long position;

public LocalFSFileInputStream(Path f) throws IOException {
  this.fis = new TrackingFileInputStream(pathToFile(f));
}

@Override
public void seek(long pos) throws IOException {
  fis.getChannel().position(pos);
  this.position = pos;
}

@Override
public long getPos() throws IOException {
  return this.position;
}

Thus, with the new mapreduce API, the RecordReader was abstracted to not necessarily return a getPos(). Newer implementations of RecordReaders which might want to use this underlying implementation can be rewritten to use the FSInputStream objects directly, which do provide a getPos().