Spark job seems not to parallelize well

1.5k views Asked by At

Using Spark 1.1

I have a job that does as follows:

  1. Reads a list of folders under a given root, parallelize the list
  2. For each folder, read the files under it - these are gzipped files
  3. For each file, extract the content - these are lines, each line represent a single event, with fields separated with tabs (TSV)
  4. Create a single RDD of all the lines.
  5. Convert the TSV to json.

(Now the lines represent a certain event type. There are 4 types: Session, request, recommendation, user event)

  1. Filter out session events only. Sample only 1:100 of them according to some user id field. Convert them to a pair, with a key that represents some output structure (like: event type/date/the events), then write it to FS.
  2. Do the same for requests and user events

(For recommendations, the sampling cannot be done according to user id (since it does not exist there), but we know there is a 1:1 relationship between request and recommendation based on a mutual request id field. So:)

  1. Create a list of distinct request ids. join this list with the recommendations list based on request id as key, therefore achieve the filtering we want. Then output the reduced list to FS.

Now, Here is my problem. The code I use to do those things work for small scale. But when I run on relatively large input, and I use a cluster of 80 machines with 8 cores and 50GB memory each, I can see that many machines are not utilized, meaning only one core is occupied (and also only ~20%), and memory is only at 16GB out of 40GB configured to the job.

I think somewhere my transformations do not get parallelized well, but I am not sure where and why. Here is most of my code (I omit some of the auxiliary functions I thought are irrelevant to the problem)

 public static void main(String[] args) {

    BasicConfigurator.configure();

    conf[0] = new Conf("local[4]");
    conf[1] = new Conf("spark://hadoop-m:7077");
    Conf configuration = conf[1];

    if (args.length != 4) {
        log.error("Error in parameters. Syntax: <input path> <output_path> <filter_factor> <locality>\nfilter_factor is what fraction of sessions to process. For example, to process 1/100 of sessions, use 100\nlocality should be set to \"local\" in case running on local environment, and to \"remote\" otherwise.");
        System.exit(-1);
    }

    final String inputPath = args[0];
    final String outputPath = args[1];
    final Integer filterFactor;

    if (args[3].equals("local")) {
        configuration = conf[0];
    }

    log.setLevel(Level.DEBUG);
    Logger.getRootLogger().removeAppender("console");
    final SparkConf conf = new SparkConf().setAppName("phase0").setMaster(configuration.getMaster());
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "com.doit.customer.dataconverter.MyRegistrator");
    final JavaSparkContext sc = new JavaSparkContext(conf);
    if (configuration.getMaster().contains("spark:")) {
        sc.addJar("/home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar");
    }
    try {
        filterFactor = Integer.parseInt(args[2]);
        // read all folders from root
        Path inputPathObj = new Path(inputPath);
        FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
        FileStatus[] statusArr = fs.globStatus(inputPathObj);
        List<FileStatus> statusList = Arrays.asList(statusArr);

        List<String> pathsStr = convertFileStatusToPath(statusList);

        JavaRDD<String> paths = sc.parallelize(pathsStr);

        // read all files from each folder
        JavaRDD<String> filePaths = paths.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            @Override
            public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
                List<String> filesPath = new ArrayList<String>();
                if (pathsIterator != null) {
                    while (pathsIterator.hasNext()) {
                        String currFolder = pathsIterator.next();
                        Path currPath = new Path(currFolder);
                        FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                        FileStatus[] files = fs.listStatus(currPath);
                        List<FileStatus> filesList = Arrays.asList(files);
                        List<String> filesPathsStr = convertFileStatusToPath(filesList);
                        filesPath.addAll(filesPathsStr);
                    }
                }
                return filesPath;
            }
        });


        // Transform list of files to list of all files' content in lines
        JavaRDD<String> typedData = filePaths.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return linesList;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);

                    // This line will not be added to the list ,
                    // which is what we want - filter the header row
                    String line = r.readLine();

                    // Read all lines
                    while ((line = r.readLine()) != null) {
                        try {
                            String sliceKey = getSliceKey(line, fileType);
                            // Adding event type and output slice key as additional fields
                            linesList.add(fileType + "\t" + sliceKey + "\t" + line);
                        } catch(ParseException e) {
                        }
                    }

                    return linesList;
                } catch (Exception e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                    return new ArrayList();
                }
            }
            // flatten to one big list with all the lines
        }).flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterable<String> call(List<String> strings) throws Exception {
                return strings;
            }
        });

        // convert tsv to json

        JavaRDD<ObjectNode> jsons = typedData.mapPartitions(new FlatMapFunction<Iterator<String>, ObjectNode>() {
            @Override
            public Iterable<ObjectNode> call(Iterator<String> stringIterator) throws Exception {
                List<ObjectNode> res = new ArrayList<>();
                while(stringIterator.hasNext()) {
                    String currLine = stringIterator.next();
                    Iterator<String> i = Splitter.on("\t").split(currLine).iterator();
                    if (i.hasNext()) {
                        String type = i.next();
                        ObjectNode json = convert(currLine, type, filterFactor);
                        if(json != null) {
                            res.add(json);
                        }
                    }
                }
                return res;
            }
        }).cache();


        createOutputType(jsons, "Session", outputPath, null);
        createOutputType(jsons, "UserEvent", outputPath, null);
        JavaRDD<ObjectNode> requests = createOutputType(jsons, "Request", outputPath, null);


        // Now leave only the set of request ids - to inner join with the recommendations
        JavaPairRDD<String,String> requestsIds = requests.mapToPair(new PairFunction<ObjectNode, String, String>() {
            @Override
            public Tuple2<String, String> call(ObjectNode jsonNodes) throws Exception {
                String id = jsonNodes.get("id").asText();
                return new Tuple2<String, String>(id,id);
            }
        }).distinct();

        createOutputType(jsons,"RecommendationList", outputPath, requestsIds);

    } catch (IOException e) {
        log.error(e);
        System.exit(1);
    } catch (NumberFormatException e) {
        log.error("filter factor is not a valid number!!");
        System.exit(-1);
    }

    sc.stop();

}

private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {

    outputPath = outputPath + "/" + type;

    JavaRDD events = jsonsList.filter(new Function<ObjectNode, Boolean>() {
        @Override
        public Boolean call(ObjectNode jsonNodes) throws Exception {
            return jsonNodes.get("type").asText().equals(type);
        }
    });


    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
    if(joinKeys != null) {
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
            @Override
            public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
                return new Tuple2<String, ObjectNode>(jsonNodes.get("requestId").asText(),jsonNodes);
            }
        });

        JavaRDD<ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new Function<Tuple2<String, ObjectNode>, ObjectNode>() {
           @Override
           public ObjectNode call(Tuple2<String, ObjectNode> stringObjectNodeTuple2) throws Exception {
               return stringObjectNodeTuple2._2;
           }
        });
        events = joinedEvents;
    }


    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
        @Override
        public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
            return new Tuple2<String, ObjectNode>(jsonNodes.get("sliceKey").asText(),jsonNodes);
        }
    }).groupByKey();
    // Add convert jsons to strings and add "\n" at the end of each

    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new PairFunction<Tuple2<String, Iterable<ObjectNode>>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, Iterable<ObjectNode>> content) throws Exception {
            String string = jsonsToString(content._2);
            log.error(string);
            return new Tuple2<>(content._1, string);
        }
    });
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
    return events;
}

// Notice the special case of if(joinKeys != null) in which I join the recommendations with request ids.

Finally, the command I use to start the Spark job is:

spark-submit --class com.doit.customer.dataconverter.Phase0 --driver-cores 8 --total-executor-cores 632 --driver-memory 40g --executor-memory 40G --deploy-mode cluster /home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar gs://input/2014_07_31* gs://output/2014_07_31 100 remote
1

There are 1 answers

0
Jim McBeath On BEST ANSWER

Your initial partitions are based on the set of folders in your root (sc.parallelize(pathsStr)). There are two steps in your flow that can significantly unbalance your partitions: 1) reading the list of files within each folder, if some folders have many more files than other folders; 2) reading the TSV lines from each file, if some files have many more lines than others.

If your files are roughly the same size, but you have many more in some folders than others, you can rebalance your partitions after you collect the filenames. After setting the initial value for filePaths, try adding this line:

filePaths = filePaths.repartition(sc.defaultParallelism());

This will shuffle the collected filenames into balanced partitions.

If you have an imbalance due to some files being significantly larger than others, you can try rebalancing your typedData RDD by similarly invoking repartition on it, although this will be much more expensive, as it will shuffle all of your TSV data.

Alternatively, if you rebalance filePaths and still have some partition imbalance caused by having a number of somewhat larger files that end up in a few partitions, you might be able to get a bit better performance by using a larger number in the repartition argument, such as multiplying by four so that you get four times as many partitions as there are cores. This will increase the communication cost a bit, but might be a win if it provides better balancing of the resulting partition sizes in typedData.