OutOfMemoryError when using PySpark to read files in local mode

2.1k views Asked by At

I have about a dozen gpg-encrypted files containing data I'd like to analyze using PySpark. My strategy is to apply a decryption function as a flat map to each file and then proceed processing at the record level:

def read_fun_generator(filename):
    with gpg_open(filename[0].split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_files = sc.wholeTextFiles(/path/to/files/*.gpg)
rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

This approach works quite well when using a single thread in local mode, i.e. setting the master to local[1]. However, using any more than a single thread causes an OutOfMemoryError to be thrown. I've tried increasing spark.executor.memory and spark.driver.memory to 30g, but this seems not to help. I can confirm in the UI that those settings have stuck. (My machine has over 200GB available.) However, I've noticed in the logs that the block manager seems to be starting with only 265.4 MB of memory. I wonder if this is related?

Here is the full configuration I'm starting with:

conf = (SparkConf()
         .setMaster("local[*]")
         .setAppName("pyspark_local")
         .set("spark.executor.memory", "30g")
         .set("spark.driver.memory", "30g")
         .set("spark.python.worker.memory", "5g")
       )
sc = SparkContext(conf=conf)

This is the stack trace from my logs:

15/06/10 11:03:30 INFO SparkContext: Running Spark version 1.3.1
15/06/10 11:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/10 11:03:31 INFO SecurityManager: Changing view acls to: santon
15/06/10 11:03:31 INFO SecurityManager: Changing modify acls to: santon
15/06/10 11:03:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(santon); users with modify permissions: Set(santon)
15/06/10 11:03:31 INFO Slf4jLogger: Slf4jLogger started
15/06/10 11:03:31 INFO Remoting: Starting remoting
15/06/10 11:03:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:44347]
15/06/10 11:03:32 INFO Utils: Successfully started service 'sparkDriver' on port 44347.
15/06/10 11:03:32 INFO SparkEnv: Registering MapOutputTracker
15/06/10 11:03:32 INFO SparkEnv: Registering BlockManagerMaster
15/06/10 11:03:32 INFO DiskBlockManager: Created local directory at /tmp/spark-24dc8f0a-a89a-44f8-bb95-cd5514e5bf0c/blockmgr-85b6f082-ff5a-4a0e-b48a-1ec62715dda0
15/06/10 11:03:32 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/06/10 11:03:32 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7b2172ed-d658-4e11-bbc1-600697f3255e/httpd-5423f8bc-ec43-48c5-9367-87214dad54f4
15/06/10 11:03:32 INFO HttpServer: Starting HTTP Server
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started [email protected]:50366
15/06/10 11:03:32 INFO Utils: Successfully started service 'HTTP file server' on port 50366.
15/06/10 11:03:32 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started [email protected]:4040
15/06/10 11:03:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/06/10 11:03:32 INFO SparkUI: Started SparkUI at localhost:4040
15/06/10 11:03:32 INFO Executor: Starting executor ID <driver> on host localhost
15/06/10 11:03:32 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:44347/user/HeartbeatReceiver
15/06/10 11:03:33 INFO NettyBlockTransferService: Server created on 46730
15/06/10 11:03:33 INFO BlockManagerMaster: Trying to register BlockManager
15/06/10 11:03:33 INFO BlockManagerMasterActor: Registering block manager localhost:46730 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 46730)
15/06/10 11:03:33 INFO BlockManagerMaster: Registered BlockManager
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(215726) called with curMem=0, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.7 KB, free 265.2 MB)
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(31533) called with curMem=215726, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 265.2 MB)
15/06/10 11:05:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46730 (size: 30.8 KB, free: 265.4 MB)
15/06/10 11:05:19 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/06/10 11:05:19 INFO SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 71665121
15/06/10 11:05:22 INFO SparkContext: Starting job: count at <timed exec>:2
15/06/10 11:05:22 INFO DAGScheduler: Got job 0 (count at <timed exec>:2) with 2 output partitions (allowLocal=false)
15/06/10 11:05:22 INFO DAGScheduler: Final stage: Stage 0(count at <timed exec>:2)
15/06/10 11:05:22 INFO DAGScheduler: Parents of final stage: List()
15/06/10 11:05:22 INFO DAGScheduler: Missing parents: List()
15/06/10 11:05:22 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[1] at count at <timed exec>:2), which has no missing parents
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(6264) called with curMem=247259, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.1 KB, free 265.2 MB)
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(4589) called with curMem=253523, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.5 KB, free 265.2 MB)
15/06/10 11:05:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46730 (size: 4.5 KB, free: 265.4 MB)
15/06/10 11:05:23 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/06/10 11:05:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/06/10 11:05:23 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at count at <timed exec>:2)
15/06/10 11:05:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/06/10 11:05:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1903 bytes)
15/06/10 11:05:23 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 3085 bytes)
15/06/10 11:05:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/10 11:05:23 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/10 11:05:26 INFO WholeTextFileRDD: Input split: Paths:[gpg_files]
15/06/10 11:05:40 ERROR Utils: Uncaught exception in thread stdout writer for /anaconda/python/bin/python
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Exception in thread "stdout writer for /anaconda/python/bin/python" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
15/06/10 11:05:47 INFO PythonRDD: Times: total = 24140, boot = 2860, init = 664, finish = 20616
15/06/10 11:05:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1873 bytes result sent to driver
15/06/10 11:05:47 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 24251 ms on localhost (1/2)

Has anyone run into this problem? Is there a setting I'm not aware of that I should modify? It seems like this should be possible...

1

There are 1 answers

0
vvladymyrov On BEST ANSWER

The thing with sc.wholeTextFiles(/path/to/files/*.gpg) - to returns PairRDD, key - the file name and the value - is the file contents.

Looks like you are not using file contents part, but still told Spark to read files from disk and ship them to workers.

If your goal is to process the list of file names only, and the contents of them to be read with gpg_open you can do this:

def read_fun_generator(filename):
    with gpg_open(filename.split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_filelist = glob.glob("/path/to/files/*.gpg")
# generate RDD with file name per record
gpg_files = sc.parallelize(gpg_filelist)

rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

This would reduce the amount of memory used by Spark's JVM.