Number of parallel mapper tasks in Hadoop Streaming job

486 views Asked by At

I'm just starting to learn about Hadoop. I'm trying to use the streaming interface in conjunction with a Python script that processes files: for each input file I create an output file with some information about it, so this is a map job with no reducer. What I'm finding is that files are being processed one at a time, which isn't quite what I'd wanted.

I'll explain what I've done, but I'll also post some code afterwards in case there's something I'm missing there.

I've got an input format and record reader that reads whole files and uses their content as values and file names as keys. (The files aren't huge.) On the other end, I've got an output format and record writer that writes out values to files with names based on the keys. I'm using -io rawbytes and my Python script knows how to read and write key/value pairs.

It all works fine, in terms of producing the output I'm expecting. If I run with, e.g., 10 input files I get 10 splits. That means that each time my script runs it only gets one key/value pair - which isn't ideal, but it's not a big deal, and I can see that this might be unavoidable. What's less good is that it that there is only one running instance of the script at any one time. Setting mapreduce.job.maps doesn't make any difference (although I vaguely remember seeing something about this value only being a suggestions, so perhaps Hadoop is making a different decision). What am I missing?

Here's my code:-

#!/bin/bash

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -libjars mimi.jar \
    -D mapreduce.job.reduces=0 \
    -files rawbytes_mapper.py,irrelevant.py \
    -inputformat "mimi.WholeFileInputFormat" \
    -outputformat "mimi.NamedFileOutputFormat" \
    -io rawbytes \
    -mapper "rawbytes_mapper.py irrelevant blah blah blah" \
    -input "input/*.xml" \
    -output output
#!/usr/bin/python

def read_raw_bytes(input):
    length_bytes = input.read(4)
    if len(length_bytes) < 4:
        return None
    length = 0
    for b in length_bytes:
        length = (length << 8) + ord(b)
    return input.read(length)

def write_raw_bytes(output, s):
    length = len(s)
    length_bytes = []
    for _ in range(4):
        length_bytes.append(chr(length & 0xff))
        length = length >> 8
    length_bytes.reverse()
    for b in length_bytes:
        output.write(b)
    output.write(s)

def read_keys_and_values(input):
    d = {}
    while True:
        key = read_raw_bytes(input)
        if key is None: break
        value = read_raw_bytes(input)
        d[key] = value
    return d

def write_keys_and_values(output, d):
    for key in d:
        write_raw_bytes(output, key)
        write_raw_bytes(output, d[key])

if __name__ == "__main__":
    import sys
    module = __import__(sys.argv[1])
    before = read_keys_and_values(sys.stdin)
    module.init(sys.argv[2:])
    after = module.process(before)
    write_keys_and_values(sys.stdout, after)
package mimi;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class WholeFileInputFormat extends FileInputFormat<BytesWritable, BytesWritable>
{
    private static class WholeFileRecordReader implements RecordReader<BytesWritable, BytesWritable>
    {
        private FileSplit split;
        private JobConf conf;
        private boolean processed = false;

        public WholeFileRecordReader(FileSplit split, JobConf conf)
        {
            this.split = split;
            this.conf = conf;
        }

        @Override
        public BytesWritable createKey()
        {
            return new BytesWritable();
        }

        @Override
        public BytesWritable createValue()
        {
            return new BytesWritable();
        }

        @Override
        public boolean next(BytesWritable key, BytesWritable value) throws IOException
        {
            if (processed)
            {
                return false;
            }

            byte[] contents = new byte[(int) split.getLength()];
            Path file = split.getPath();
            String name = file.getName();
            byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
            key.set(bytes, 0, bytes.length);
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try
            {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            }
            finally
            {
                IOUtils.closeStream(in);
            }

            processed = true;
            return true;
        }

        @Override
        public float getProgress() throws IOException
        {
            return processed ? 1.0f : 0.0f;
        }

        @Override
        public long getPos() throws IOException
        {
            return processed ? 0l : split.getLength();
        }

        @Override
        public void close() throws IOException
        {
            // do nothing
        }
    }

    @Override
    protected boolean isSplitable(FileSystem fs, Path file)
    {
        return false;
    }

    @Override
    public RecordReader<BytesWritable, BytesWritable> getRecordReader(InputSplit split,
                                                                      JobConf conf,
                                                                      Reporter reporter)
    throws IOException
    {
        return new WholeFileRecordReader((FileSplit) split, conf);
    }
}
package mimi;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class NamedFileOutputFormat extends MultipleOutputFormat<BytesWritable, BytesWritable>
{
    private static class BytesValueWriter implements RecordWriter<BytesWritable, BytesWritable>
    {
        FSDataOutputStream out;

        BytesValueWriter(FSDataOutputStream out)
        {
            this.out = out;
        }

        @Override
        public synchronized void write(BytesWritable key, BytesWritable value) throws IOException
        {
            out.write(value.getBytes(), 0, value.getLength());
        }

        @Override
        public void close(Reporter reporter) throws IOException
        {
            out.close();
        }
    }

    @Override
    protected String generateFileNameForKeyValue(BytesWritable key, BytesWritable value, String name)
    {
        return new String(key.getBytes(), 0, key.getLength(), StandardCharsets.UTF_8);
    }

    @Override
    public RecordWriter<BytesWritable, BytesWritable> getBaseRecordWriter(FileSystem ignored,
                                                                          JobConf conf,
                                                                          String name,
                                                                          Progressable progress)
    throws IOException
    {
        Path file = FileOutputFormat.getTaskOutputPath(conf, name);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream out = fs.create(file, progress);
        return new BytesValueWriter(out);
    }
}
1

There are 1 answers

0
Bartosz Kotwica On

I think I can help you with this part of your problem:

each time my script runs it only gets one key/value pair - which isn't ideal

If isSplitable method returns false only one file per mapper will be processed. So if you won't override isSplitable method and leave it return true you should have more than one key/value pair in one mapper. In your case every file is one key/value pair so they can't be splitted even when isSplitable returns true.

I cannot figure out why only one mapper starts at one time, but I'm still thinking about it :)