pyspark delta table: How to save a grouped Dataframe to Different Tables

241 views Asked by At

similar to this question. How can I do same to write different groups of dataframe to different delta live tables? something similar to following where I am not limited by just panda dataframe. Allowing apply to pass either spark dataframe or a spark session to aggregate function.

def mycustomNotPandaAgg(key, Iterator, sparkSession|sparkDataframe):
   temp_df = sparkSession.createDataFrame(Iterator) #I can apply schema here
   temp_df.createOrReplaceTable("temp_df")
   sparkSession.sql('insert into ... key as select * from temp_df') #key is table_name
   or
   sparkDataframe.writeToTable(key)  #where sparkDataframe is created internally from each group and passed into this apply function

my_df.groupBy("table_name").apply(mycustomNotPandaAgg)

ps - I have already tried filter approach where I filter same dataframe for each table, get N dataframe (1 for each table) and save them. It's not efficient as data is skewed per key. even if I persist the dataframe before filter spark still launches jobs for each filters.

1

There are 1 answers

2
Philip Dakin On

One way you could accomplish this without pulling all data to the driver is by collecting the distinct keys, then writing each filtered DataFrame individually:

from pyspark.sql.functions import col

filters = rtd.select("CustomerID").distinct().collect()
for f in filters:
    rtd.filter(col("CustomerID") == f[0]).show() # Replace show() with your write logic.

Note that unfortunately you will have to write the output tables serially. Could work around this with multiprocessing, or perhaps there is another commenter with a more Spark-native way to write groupBy results while using Spark for parallelization.