I'm trying to implement Uber's Petastorm dataset creation which utilizes Spark to create a parquet file following the tutorial on their Github page.
The code:
spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
schema=MySchema, row_group_size_mb=256):
logging.info('Building RDD...')
rows_rdd = sc.parallelize(ids)\
.map(row_generator)\ # Generator that yields lists of examples
.flatMap(lambda x: dict_to_spark_row(MySchema, x))
logging.info('Creating DataFrame...')
spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet('file:///opt/data/hello_world_dataset')
Now the RDD code executes successfully but fails only the .createDataFrame
call with the following error:
_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB
This is my first experience with Spark, so I can't really tell if this error originates in Spark or Petastorm.
Looking through other solutions to this error (in respect to Spark, not Petastorm) I saw that it might have to do with the pickling protocol, but I can't confirm that, neither did I find a way of altering the pickling protocol.
How could I avoid this error?
The problem lies in the pickling that is done to pass the data between the different processes, the default pickling protocol is 2, and we need to use 4 in order to pass objects larger than 4GB.
To change the pickling protocol, before creation a Spark session, use the following code