Trying to create parquet Petastorm dataset

282 views Asked by At

I'm currently trying to create a parquet petastorm dataset to store a video dataset. My code is:

MotionSchema = Unischema('TeaserSchema', [
    UnischemaField(
        'video', np.uint8, (None, None, None, 3), NdarrayCodec(), False)])

session_builder = SparkSession\
    .builder\
    .appName('Motion videos dataset creation')

spark = session_builder.getOrCreate()
spark_context = spark.sparkContext

motion_df = pd.read_feather(metadata_config.OUTPUT_PATH)

with materialize_dataset(spark, petastorm_config.OUTPUT_PATH, MotionSchema):
    motion_path_rdd = spark_context.parallelize(
        motion_df["@path"].tolist())
    print(motion_path_rdd.count())

    motion_path_np_rdd = motion_path_rdd\
        .map(lambda path: {
            MotionSchema.video.name: skvideo.io.vread(path)
        })
    rows_rdd = motion_path_np_rdd.map(
        lambda row: dict_to_spark_row(MotionSchema, row))
    spark.createDataFrame(
        rows_rdd, MotionSchema.as_spark_schema())\
        .coalesce(10)\
        .write\
        .mode('overwrite')\
        .option('compression', 'none')\
        .parquet(petastorm_config.OUTPUT_PATH)

When I execute it the following error occurs:

File "build_motion_videos_petastorm.py", line 59, in <module>
.parquet(petastorm_config.OUTPUT_PATH)
File "/home/guilherme/spark-3.0.0-preview2-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 879, in parquet
self._jwrite.parquet(path)
File "/home/guilherme/.virtualenvs/motion_understanding_venv/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/guilherme/spark-3.0.0-preview2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 98, in deco
return f(*a, **kw)
File "/home/guilherme/.virtualenvs/motion_understanding_venv/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o54.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)

I have no idea what is going on, any clue can be helpful.

Thank you.

0

There are 0 answers