Reading a record broken down into two lines because of /n in MapReduce

1k views Asked by At

I am trying to write a custom reader which serves me the purpose of reading a record (residing in two lines) with defined number of fields.

For Eg

1,2,3,4("," can be there or not)
,5,6,7,8

My requirement is to read the record and push it into mapper as a single record like {1,2,3,4,5,6,7,8}. Please give some inputs.

UPDATE:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        key = new LongWritable();
    }

    //Current offset is the key
    key.set(pos); 

    if(value == null) {
        value = new Text();
    }

    int newSize = 0;
    int numFields = 0;
    Text temp = new Text();
    boolean firstRead = true;

    while(numFields < reqFields) {
        while(pos < end) {
            //Read up to the '\n' character and store it in 'temp'
            newSize = in.readLine(  temp, 
                                    maxLineLength, 
                                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                                             maxLineLength));

            //If 0 bytes were read, then we are at the end of the split
            if(newSize == 0) {
                break;
            }

            //Otherwise update 'pos' with the number of bytes read
            pos += newSize;

            //If the line is not too long, check number of fields
            if(newSize < maxLineLength) {
                break;
            }

            //Line too long, try again
            LOG.info("Skipped line of size " + newSize + " at pos " + 
                        (pos - newSize));
        }

        //Exit, since we're at the end of split
        if(newSize == 0) {
            break;
        }
        else {
            String record = temp.toString();
            StringTokenizer fields = new StringTokenizer(record,"|");

            numFields += fields.countTokens();

            //Reset 'value' if this is the first append
            if(firstRead) {
                value = new Text();
                firstRead = false;
            }

            if(numFields != reqFields) {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
            else {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    }
    else {
        return true;
    }
}

}

This is the nextKeyValue method which I am trying to work on. But still the mapper are not getting proper values. reqFields is 4.

2

There are 2 answers

0
Aviral Kumar On BEST ANSWER

The string had to be tokenized using StringTokenizer and not split. The code has been updated with the new implmentation.

3
miljanm On

Look at how TextInputFormat is implemented. Look at it's superclass, FileInputFormat as well. You must subclass Either TextInputFormat of FileInputFormat and implement your own record handling.

Thing to be aware when implementing any kind of file input format is this:

Framework will split the file and give you the start offset and byte length of the piece of the file you have to read. It may very well happen that it splits the file right across some record. That is why your reader must skip the bytes of the record at the beginning of the split if that record is not fully contained in the split, as well as read past the last byte of the split to read the whole last record if that one is not fully contained in the split.

For example, TextInoutFormat treats \n characters as record delimiters so when it gets the split it skips the bytes until the first \n character and read past the end of the split until the \n character.

As for the code example:

You need to ask yourself the following question: Say you open the file, seek to a random position and start reading forward. How do you detect the start of the record? I don't see anything in your code that deals with that, and without it, you cannot write a good input format, because you don't know what are the record boundaries.

Now it is still possible to make the input format read the whole file end to end by making the isSplittable(JobContext,Path) method return false. That makes the file read wholly by single map task which reduces parallelism.

Your inner while loop seems problematic since it's checking for lines that are too long and is skipping them. Given that your records are written using multiple lines, it can happen that you merge one part of one record and another part of another record when you read it.