Spark dataframe : When does it materialize?

1.9k views Asked by At

I have a spark question :

I have a job that errors out with : 403 Access Denied on S3

The spark job basically:

  1. Gets data from LF resource linked tables from Glue Catalog
  2. Creates temp views
  3. Runs a bunch of transformations
  4. Stores the data in an external location

I get sporadic errors in step 3 where we are doing a bunch of transformations. I say sporadic, because sometimes I would get no errors and the other times it pops up on any one of the functions that exist in step 3.

Wouldnt running a spark sql select statement (and storing it as temp view) on a glue dynamic frame materialize the data within the spark session in-memory? e.g.:

    df=glueContext.create_dynamic_frame_from_catalog(args)
    df=df.toDF
    df.createorreplacetempview(tbl1)
    dfnew=spark.sql(select * from tbl1)
    dfnew.createorreplacetempview(tbl2)


..step3 transformations on tbl2(this is where the error happens)

Is my understanding correct in that tbl1 has materialized into the spark session in-memory, but tbl2 is still lazily stored? If so, then if I run spark sql statement on tbl2 it will materialize by querying from tbl1, not the glue catalog source tables, correct?

How can I ensure in the above script the LF tables are not accessed after getting them in a dynamic frame because the upstream data is continuously updated?

1

There are 1 answers

2
Frosty On

The understanding that you have of spark SQL views is not correct.

Spark SQL views are lazily evaluated and don't really materialize until you call an action. In fact, NONE of the lazily evaluated parts (also called transformations in Spark technical terms) are materialized until and unless you call an action.

All it does is create a DAG in the backend with all the transformations you have done so far and materialize all that when you call an action.

df.createorreplacetempview(tbl1) #lazily-evaluated
dfnew=spark.sql(select * from tbl1) #lazily-evaluated
dfnew.createorreplacetempview(tbl2) #lazily-evaluated
dfnew.show() #Action call --> materilaizes all the transformations done so far.

The error you are getting is most likely because of the permissions while reading or writing into a particular S3 location.

I hope this answers your first half of the question. It can be explained better if you can share what is happening in the transformation or if you are using any action during those transformations or the best way is to share the stacktrace of the error to get more definitive answer.

Also if you are using Spark 3.0 or higher you can materialize your transformations by using noop write format.

df.write.mode("overwrite").format("noop").save()

You can simply specify it as the write format and it will materialize the query and execute all the transformations but it will not write the result anywhere.