spark repartition data for small file

1k views Asked by At

I am pretty new to Spark and I am using a cluster mainly for paralellizing purpose. I have a 100MB file, each line of which is processed by some algorithm, which is quite a heavy and long processing.

I want to use a 10 node cluster and parallelize the processing. I know the block size is more than 100MB, and I tried to repartition the textFile. If I understand well, this repartition method increases the number of partitions:

JavaRDD<String> input = sc.textFile(args[0]);
input.repartition(10);

The issue is that when I deploy to the cluster, only a single node is effectively processing. How can I manage to process the file in parallel?

Update 1: here's my spark-submit command:

/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars 
myjar.jar 
gs://mybucket/input.txt outfile

Update 2: After the partition, there are basically 2 operations :

JavaPairRDD<String, String> int_input = mappingToPair(input);
JavaPairRDD<String, String> output = mappingValues(int_input, option);
output.saveAsTextFile("hdfs://...");

where mappingToPair(...) is

public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){
        return input.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String line) {
                String[] arrayList = line.split("\t", 2);
                return new Tuple2(arrayList[0], arrayList[1]);
            }
        });
    }

and mappingValues(...) is a method of the following type:

public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){
        return rdd.mapValues(
                new Function<String, String>() {
                    // here the algo processing takes place...
                }
        )
}
1

There are 1 answers

0
Daniel Zolnai On BEST ANSWER

There could be multiple issues here:

  1. The file is only one block big. Reading this with multiple executors is not useful at all, since the HDFS node can serve one node with full speed, or two nodes with half the speed (plus overhead), etc.. Executor count becomes useful (for the read step) when you have multiple blocks scattered across different HDFS nodes.
  2. It is also possible that you are storing the file in a non-splittable compressed format, so the input step can only read it with one executor, even if it would be 100 times as big as the block size.
  3. You do not chain the repartition(10) call into your flow, so it is not effective at all. If you replace this line: input.repartition(10); with this one: input = input.repartition(10); it will be used, and it should split the RDD into multiple ones before continuing to the next step.

Please note that repartitioning can make your process even longer, since the data has to be splitted and transferred to the other machines, which can be easily be bottlenecked by slow network.

This is especially the case when you use the client deploy mode. This means that the first executor (the driver) is your local Spark instance you submit from. So it will first download all the data to the driver from the cluster, and then upload it back to the other YARN nodes after the partitioning.

I could go on about this, but the main thing I'm trying to say is: the process might even run faster on one executor if your algorithm is very simple, instead of partitioning, transferring, and then running the algorithm on all executors in parallel.