Running out of heap space in sparklyr, but have plenty of memory

4.4k views Asked by At

I am getting heap space errors on even fairly small datasets. I can be sure that I'm not running out of system memory. For example, consider a dataset containing about 20M rows and 9 columns, and that takes up 1GB on disk. I am playing with it on a Google Compute node with 30gb of memory.

Let's say that I have this data in a dataframe called df. The following works fine, albeit somewhat slowly:

library(tidyverse) 
uniques <- search_raw_lt %>%
    group_by(my_key) %>%
    summarise() %>%
    ungroup()

The following throws java.lang.OutOfMemoryError: Java heap space.

library(tidyverse)
library(sparklyr)
sc <- spark_connect(master = "local")

df_tbl <- copy_to(sc, df)

unique_spark <- df_tbl %>%
  group_by(my_key) %>%
  summarise() %>%
  ungroup() %>%
  collect()

I tried this suggestion for increasing the heap space to Spark. The problem persists. Watching the machine's state on htop, I see that total memory usage never goes over about 10gb.

library(tidyverse)
library(sparklyr)

config <- spark_config()
config[["sparklyr.shell.conf"]] <- "spark.driver.extraJavaOptions=-XX:MaxHeapSize=24G"

sc <- spark_connect(master = "local")

df_tbl <- copy_to(sc, df)

unique_spark <- df_tbl %>%
  group_by(my_key) %>%
  summarise() %>%
  ungroup() %>%
  collect()

Finally, per Sandeep's comment, I tried lowering MaxHeapSize to 4G. (Is MaxHeapSize per virtual worker or for the entire Spark local instance?) I still got the heap space error, and again, I did not use much of the system's memory.

1

There are 1 answers

1
David Bruce Borenstein On BEST ANSWER

In looking into Sandeep's suggestions, I started digging into the sparklyr deployment notes. These mention that the driver might run out of memory at this stage, and to tweak some settings to correct it.

These settings did not solve the problem, at least not initially. However, isolating the problem to the collect stage allowed me to find similar problems using SparkR on SO.

These answers depended in part on setting the environment variable SPARK_MEM. Putting it all together, I got it to work as follows:

library(tidyverse)
library(sparklyr)

# Set memory allocation for whole local Spark instance
Sys.setenv("SPARK_MEM" = "13g")

# Set driver and executor memory allocations
config <- spark_config()
config$spark.driver.memory <- "4G"
config$spark.executor.memory <- "1G"

# Connect to Spark instance
sc <- spark_connect(master = "local")

# Load data into Spark
df_tbl <- copy_to(sc, df)

# Summarise data
uniques <- df_tbl %>%
  group_by(my_key) %>%
  summarise() %>%
  ungroup() %>%
  collect()