I'm having issues with a particular spark method, saveAsNewAPIHadoopFile
. The context is that I'm using pyspark, indexing RDDs with 1k, 10k, 50k, 500k, 1m records into ElasticSearch (ES).
For a variety of reasons, the Spark context is quite underpowered with a 2gb driver, and single 2gb executor.
I've had no problem until about 500k, when I'm getting java heap size problems. Increasing the spark.driver.memory
to about 4gb, and I'm able to index more. However, there is a limit to how long this will work, and we would like to index in upwards of 500k, 1m, 5m, 20m records.
Also constrained to using pyspark, for a variety of reasons. The bottleneck and breakpoint seems to be a spark stage called take at SerDeUtil.scala:233
, that no matter how many partitions the RDD has going in, it drops down to one, which I'm assuming is the driver collecting the partitions and preparing for indexing.
Now - I'm wondering if there is an efficient way to still use an approach like the following, given that constraint:
to_index_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.resource":"%s/record" % index_name,
"es.nodes":"192.168.45.10:9200",
"es.mapping.exclude":"temp_id",
"es.mapping.id":"temp_id",
}
)
In pursuit of a good solution, I might as well air some dirty laundry. I've got a terribly inefficient workaround that uses zipWithIndex
to chunk an RDD, and send those subsets to the indexing function above. Looks a bit like this:
def index_chunks_to_es(spark=None, job=None, kwargs=None, rdd=None, chunk_size_limit=10000):
# zip with index
zrdd = rdd.zipWithIndex()
# get count
job.update_record_count(save=False)
count = job.record_count
# determine number of chunks
steps = count / chunk_size_limit
if steps % 1 != 0:
steps = int(steps) + 1
# evenly distribute chunks, while not exceeding chunk_limit
dist_chunk_size = int(count / steps) + 1
# loop through steps, appending subset to list for return
for step in range(0, steps):
# determine bounds
lower_bound = step * dist_chunk_size
upper_bound = (step + 1) * dist_chunk_size
print(lower_bound, upper_bound)
# select subset
rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0])
# index to ElasticSearch
ESIndex.index_job_to_es_spark(
spark,
job=job,
records_df=rdd_subset.toDF(),
index_mapper=kwargs['index_mapper']
)
It's slow, if I'm understanding correctly, because that zipWithIndex
, filter
, and map
are evaluated for each resulting RDD subset. However, it's also memory efficient in that 500k, 1m, 5m, etc. records are never sent to saveAsNewAPIHadoopFile
, instead, these smaller RDDs that a relatively small spark driver can handle.
Any suggestions for different approaches would be greatly appreciated. Perhaps that means now using the Elasticsearch-Hadoop
connector, but instead sending raw JSON to ES?
Update:
Looks like I'm still getting java heap space errors with this workaround, but leaving here to demonstrate thinking for a possible workaround. Had not anticipated that zipWithIndex
would collect everything on the driver (which I'm assuming is the case here)
Update #2
Here is a debug string of the RDD I'ma attempting to run through saveAsNewAPIHadoopFile
:
(32) PythonRDD[6] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| ShuffledRowRDD[3] at javaToPython at NativeMethodAccessorImpl.java:-2 []
+-(1) MapPartitionsRDD[2] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| MapPartitionsRDD[1] at javaToPython at NativeMethodAccessorImpl.java:-2 []
| JDBCRDD[0] at javaToPython at NativeMethodAccessorImpl.java:-2 []
Update #3
Below is a DAG visualization for the take at SerDeUtil.scala:233
that appears to run on driver/localhost
:
And a DAG for the saveAsNewAPIHadoopFile
for a much smaller job (around 1k rows), as the 500k rows attempts never actually fire as the SerDeUtil
stage above is what appears to trigger the java heap size problem for larger RDDs:
I'm still a bit confused as to why this addresses the problem, but it does. When reading rows from MySQL with
spark.jdbc.read
, by passing bounds, the resulting RDD appears to be partitioned in such a way thatsaveAsNewAPIHadoopFile
is successful for large RDDs.Have a Django model for the DB rows, so can get first and last column IDs:
Then, pass those to
spark.read.jdbc
:The debug string for the RDD shows that the originating RDD now has
10
partitions:Where my understanding breaks down, is that you can see there is a manual/explicit repartitioning to
32
, both in the debug string from the question, and this one above, which I thought would be enough to ease memory pressure on thesaveAsNewAPIHadoopFile
call, but apparently the Dataframe (turned into an RDD) from the originalspark.jdbc.read
matters even downstream.