How to put the files into memory using Hadoop Distributed cache?

13.6k views Asked by At

As far as I know, distributed cache copies files to every node, then map or reduce reads the files from the local file system.

My question is: Is there a way that we can put our files into memory using Hadoop distributed cache so that every map or reduce can read files directly from memory?

My MapReduce program distributes a png picture which is about 1M to every node, then every map task reads the picture from the distributed cache and does some image processing with another picture from the input of the map.

2

There are 2 answers

2
user. On
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

          Path[] uris = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());





                    try{
                        BufferedReader readBuffer1 = new BufferedReader(new FileReader(uris[0].toString()));
                        String line;
                        while ((line=readBuffer1.readLine())!=null){
                            System.out.println(line);

                        }
                        readBuffer1.close(); 
                    }       
                    catch (Exception e){
                        System.out.println(e.toString());
                    }

                  StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      int length=key.getLength();
      System.out.println("length"+length);
      result.set(sum);
/*      key.set("lenght"+lenght);*/
      context.write(key, result);


    }
  }

  public static void main(String[] args) throws Exception {

      final String NAME_NODE = "hdfs://localhost:9000";
    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);


    DistributedCache.addCacheFile(new URI(NAME_NODE
      + "/dataset1.txt"),
      job.getConfiguration());



    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
0
Saket On

great question. I am also trying to solve the similar issue. I don’t think Hadoop supports in memory cache out of the box. However it should not be very difficult to have another in memory cache somewhere on the grid for this purpose. We can pass the location of cache and name of the parameter as part of Job Configuration.

As far as code example above is concerned it doesn’t answer the original question. In addition it showcases non-optimum code sample. Ideally you should access the cache file as part of setup() method and cache any information you may want to use as part of map() method. In the example above cache file will be read once for every key-value pair which compromises with the performance of the mapreduce job.