Creating parquet Petastorm dataset through Spark fails with Overflow error (larger than 4GB)

1.2k views Asked by At

I'm trying to implement Uber's Petastorm dataset creation which utilizes Spark to create a parquet file following the tutorial on their Github page.

The code:

spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext

with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                         schema=MySchema, row_group_size_mb=256):

    logging.info('Building RDD...')
    rows_rdd = sc.parallelize(ids)\
        .map(row_generator)\  # Generator that yields lists of examples
        .flatMap(lambda x: dict_to_spark_row(MySchema, x))

    logging.info('Creating DataFrame...')
    spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
        .coalesce(10) \
        .write \
        .mode('overwrite') \
        .parquet('file:///opt/data/hello_world_dataset')

Now the RDD code executes successfully but fails only the .createDataFrame call with the following error:

_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

This is my first experience with Spark, so I can't really tell if this error originates in Spark or Petastorm.

Looking through other solutions to this error (in respect to Spark, not Petastorm) I saw that it might have to do with the pickling protocol, but I can't confirm that, neither did I find a way of altering the pickling protocol.

How could I avoid this error?

2

There are 2 answers

0
bluesummers On BEST ANSWER

The problem lies in the pickling that is done to pass the data between the different processes, the default pickling protocol is 2, and we need to use 4 in order to pass objects larger than 4GB.

To change the pickling protocol, before creation a Spark session, use the following code

from pyspark import broadcast
import pickle


def broadcast_dump(self, value, f):
    pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
    f.close()

    return f.name


broadcast.Broadcast.dump = broadcast_dump
1
lockwobr On

To build off bluesummers answer

The master branch of spark right now fixes this issue, so I it used this code to patch dump function in the same way but is just a bit more safe. [test with 2.3.2]

from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle

def broadcast_dump(self, value, f):
    try:
        pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) 
    except pickle.PickleError:
        raise
    except Exception as e:
        msg = "Could not serialize broadcast: %s: %s" \
                % (e.__class__.__name__, _exception_message(e))
        print_exec(sys.stderr)
        raise pickle.PicklingError(msg)
    f.close()

broadcast.Broadcast.dump = broadcast_dump