similar to this question. How can I do same to write different groups of dataframe to different delta
live tables? something similar to following where I am not limited by just panda dataframe. Allowing apply
to pass either spark dataframe or a spark session to aggregate function.
def mycustomNotPandaAgg(key, Iterator, sparkSession|sparkDataframe):
temp_df = sparkSession.createDataFrame(Iterator) #I can apply schema here
temp_df.createOrReplaceTable("temp_df")
sparkSession.sql('insert into ... key as select * from temp_df') #key is table_name
or
sparkDataframe.writeToTable(key) #where sparkDataframe is created internally from each group and passed into this apply function
my_df.groupBy("table_name").apply(mycustomNotPandaAgg)
ps - I have already tried filter
approach where I filter same dataframe for each table, get N dataframe (1 for each table) and save them. It's not efficient as data is skewed per key. even if I persist
the dataframe before filter
spark still launches jobs for each filters.
One way you could accomplish this without pulling all data to the driver is by collecting the distinct keys, then writing each filtered DataFrame individually:
Note that unfortunately you will have to write the output tables serially. Could work around this with multiprocessing, or perhaps there is another commenter with a more Spark-native way to write groupBy results while using Spark for parallelization.