Spark is creating too many threads when reading ML model

358 views Asked by At

I have a multilabel text classification problem that I tried to resolve using the binary relevance method, by creating one binary classifier per label. I have to read 10000 models classifier to perform my classification phase, after my training phase, on all my documents, using spark. But for an unknown reason, it becomes very slow when I try to read more than 1000 models, because spark creates a new thread each time, which progressively slow down the process, and I don't know why. Here is the minimal code which illustrate my problem.

package entrepot.spark;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.sql.SparkSession;

public class maintest {
    public static void main(String[] args) throws FileNotFoundException, IllegalArgumentException, IOException {
        try(SparkSession spark = SparkSession.builder().appName("test").getOrCreate()) {
            //Listing directories to get the list of labels
            Set<String> labels = new HashSet<>();
            FileStatus[] filesstatus = FileSystem.get(spark.sparkContext().hadoopConfiguration()).listStatus(new Path("C:\\Users\\*\\Desktop\\model\\"));
            for(int i = 0; i < filesstatus.length; i++) {
                if(filesstatus[i].isDirectory()) {
                    labels.add(filesstatus[i].getPath().getName());
                }
            }

            List<MultilayerPerceptronClassificationModel> models = new ArrayList<>();
            // Here is the problem
            for(String label : labels) {
                System.out.println(label);
                MultilayerPerceptronClassificationModel model = MultilayerPerceptronClassificationModel.load("C:\\Users\\*\\Desktop\\model\\" + label + "\\CL\\");
                models.add(model);
            }

            System.out.println("done");
        }
    }
}

I'm running the program on Windows, with Spark 2.1.1 and Hadoop 2.7.3, using the following command line:

 .\bin\spark-submit^
 --class entrepot.spark.maintest^
 --master local[*]^
  /C:/Users/*/eclipse-workspace/spark/target/spark-0.0.1-SNAPSHOT.jar

To download a small repetitive sample of one of my labels model, here is the link : we.tl/T50s9UffYV (Why can't I post a simple link ??)

PS: Even though the models are serializable, I couldn't save and load everything at once using a java collection and an object stream, because I get a scala conversion error. Instead, I'm using the save/load static method from MLLib on each model, resulting in hundreds of thousands of files.

0

There are 0 answers