Spark driver memory for rdd.saveAsNewAPIHadoopFile and workarounds

1.5k views Asked by At

I'm having issues with a particular spark method, saveAsNewAPIHadoopFile. The context is that I'm using pyspark, indexing RDDs with 1k, 10k, 50k, 500k, 1m records into ElasticSearch (ES).

For a variety of reasons, the Spark context is quite underpowered with a 2gb driver, and single 2gb executor.

I've had no problem until about 500k, when I'm getting java heap size problems. Increasing the spark.driver.memory to about 4gb, and I'm able to index more. However, there is a limit to how long this will work, and we would like to index in upwards of 500k, 1m, 5m, 20m records.

Also constrained to using pyspark, for a variety of reasons. The bottleneck and breakpoint seems to be a spark stage called take at SerDeUtil.scala:233, that no matter how many partitions the RDD has going in, it drops down to one, which I'm assuming is the driver collecting the partitions and preparing for indexing.

Now - I'm wondering if there is an efficient way to still use an approach like the following, given that constraint:

to_index_rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource":"%s/record" % index_name,
        "es.nodes":"192.168.45.10:9200",
        "es.mapping.exclude":"temp_id",
        "es.mapping.id":"temp_id",
    }
)

In pursuit of a good solution, I might as well air some dirty laundry. I've got a terribly inefficient workaround that uses zipWithIndex to chunk an RDD, and send those subsets to the indexing function above. Looks a bit like this:

def index_chunks_to_es(spark=None, job=None, kwargs=None, rdd=None, chunk_size_limit=10000):

    # zip with index
    zrdd = rdd.zipWithIndex()

    # get count
    job.update_record_count(save=False)
    count = job.record_count

    # determine number of chunks
    steps = count / chunk_size_limit
    if steps % 1 != 0:
            steps = int(steps) + 1

    # evenly distribute chunks, while not exceeding chunk_limit
    dist_chunk_size = int(count / steps) + 1

    # loop through steps, appending subset to list for return
    for step in range(0, steps):

        # determine bounds
        lower_bound = step * dist_chunk_size
        upper_bound = (step + 1) * dist_chunk_size
        print(lower_bound, upper_bound)

        # select subset
        rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0])

        # index to ElasticSearch
        ESIndex.index_job_to_es_spark(
            spark,
            job=job,
            records_df=rdd_subset.toDF(),
            index_mapper=kwargs['index_mapper']
        )

It's slow, if I'm understanding correctly, because that zipWithIndex, filter, and map are evaluated for each resulting RDD subset. However, it's also memory efficient in that 500k, 1m, 5m, etc. records are never sent to saveAsNewAPIHadoopFile, instead, these smaller RDDs that a relatively small spark driver can handle.

Any suggestions for different approaches would be greatly appreciated. Perhaps that means now using the Elasticsearch-Hadoop connector, but instead sending raw JSON to ES?

Update:

Looks like I'm still getting java heap space errors with this workaround, but leaving here to demonstrate thinking for a possible workaround. Had not anticipated that zipWithIndex would collect everything on the driver (which I'm assuming is the case here)

Update #2

Here is a debug string of the RDD I'ma attempting to run through saveAsNewAPIHadoopFile:

(32) PythonRDD[6] at RDD at PythonRDD.scala:48 []
 |   MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   ShuffledRowRDD[3] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 +-(1) MapPartitionsRDD[2] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[1] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  JDBCRDD[0] at javaToPython at NativeMethodAccessorImpl.java:-2 []

Update #3

Below is a DAG visualization for the take at SerDeUtil.scala:233 that appears to run on driver/localhost:

enter image description here

And a DAG for the saveAsNewAPIHadoopFile for a much smaller job (around 1k rows), as the 500k rows attempts never actually fire as the SerDeUtil stage above is what appears to trigger the java heap size problem for larger RDDs:

enter image description here

1

There are 1 answers

0
ghukill On

I'm still a bit confused as to why this addresses the problem, but it does. When reading rows from MySQL with spark.jdbc.read, by passing bounds, the resulting RDD appears to be partitioned in such a way that saveAsNewAPIHadoopFile is successful for large RDDs.

Have a Django model for the DB rows, so can get first and last column IDs:

records = records.order_by('id')
start_id = records.first().id
end_id = records.last().id

Then, pass those to spark.read.jdbc:

sqldf = spark.read.jdbc(
    settings.COMBINE_DATABASE['jdbc_url'],
    'core_record',
    properties=settings.COMBINE_DATABASE,
    column='id',
    lowerBound=bounds['lowerBound'],
    upperBound=bounds['upperBound'],
    numPartitions=settings.SPARK_REPARTITION
)

The debug string for the RDD shows that the originating RDD now has 10 partitions:

(32) PythonRDD[11] at RDD at PythonRDD.scala:48 []
 |   MapPartitionsRDD[10] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 |   ShuffledRowRDD[8] at javaToPython at NativeMethodAccessorImpl.java:-2 []
 +-(10) MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |   MapPartitionsRDD[6] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |   JDBCRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []

Where my understanding breaks down, is that you can see there is a manual/explicit repartitioning to 32, both in the debug string from the question, and this one above, which I thought would be enough to ease memory pressure on the saveAsNewAPIHadoopFile call, but apparently the Dataframe (turned into an RDD) from the original spark.jdbc.read matters even downstream.