How to make a pyspark job properly parallelizable on multiple nodes and avoid memory issues?

910 views Asked by At

I am currently working on a PySpark job (Spark 2.2.0) which intends to train a Latent Dirichlet Allocation model based on a set of documents. Input documents are provided as a CSV file located on Google Cloud Storage.

The following code successfully ran on a single node Google Cloud Dataproc cluster (4vCPUs / 15GB of memory) with a small subset of documents (~6500), a low number of topics to generate (10) and a low number of iterations (100). However, other attempts with a larger set of documents or higher values for either the number of topics or number of iterations quickly led to memory issues and job failures.

Also, when submitting this job to a 4 nodes cluster, I could see that only one worker node was actually working (30% CPU usage), letting me think that the code is not properly optimized for parallel processing.

Code

conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True),  # id of the document
                         StructField("cleaned_content", StringType(), True)])  # cleaned text content (used for LDA)

# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)

print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"

print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"

# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))

# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)

# Instantiate LDA model
lda = LDA(k=100,  # number of topics
          optimizer="online", # 'online' or 'em'
          maxIter=100,
          featuresCol="features",
          topicConcentration=0.01,  # beta
          optimizeDocConcentration=True,  # alpha
          learningOffset=2.0,
          learningDecay=0.8,
          checkpointInterval=10,
          keepLastCheckpoint=True)

# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)

# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")

Bellow are examples of warning and error messages that have been encountered:

WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Questions

  • Is there any optimization that can be done to the code itself to ensure its scalability?
  • How can we make Spark distribute the job across all worker nodes and hopefully avoid memory issues?
1

There are 1 answers

1
Dennis Huo On BEST ANSWER

If your input data size is small even if your pipeline ends up doing dense computation on the small data, then size-based partitioning will lead to too few partitions for scalability. Since your getNumPartitions() prints 1, this indicates that Spark will use at most 1 executor core to process that data, which is why you're only seeing one worker node working.

You can try changing your initial spark.read.csv line to include a repartition at the end:

doc_df = spark.read.csv(path="gs://...", ...).repartition(32)

Then you can verify it did what you expected by seeing getNumPartitions() print 32 on the later line.