Is there any other solution for the below problem to write data faster: There is no constraint on the cluster size and configurations.
df = emp_df.join(mem_df on emp_df.col1 == mem_df.col2)
df.count() --> 217 millions
grouped_df = df.groupBy(
"col1",
"col2",
"col3",
"col4",
"col5"
)
df1 = grouped_df \
.agg(f.collect_list(f.struct("col3", "col7", "col2", "col8", "col10")).alias("items")) \
.orderBy("col1","col2","items.col8")
df2 = df1.withColumn("newCol1", f.struct(*df1.columns)).drop(*df1.columns)
df3 = df2.withColumn("newCol2", f.struct(*df2.columns)).drop(*df2.columns)
OUTPUT:
{
"newCol2": {
"newCol1": {
"col1": "val1",
"col2": "val2",
"col3": "val3",
"col4": "val4",
"col5": "val5",
"col6": "val6",
"col7": "val7",
"col8": [{
"col4_1": "val8_1",
"col4_1": "val8_2",
"col4_1": "val8_3",
"col4_1": "val8_4",
"col4_1": "val8_5",
"col4_1": "val8_6"
},
{
"col4_1": "val8_11",
"col4_1": "val8_21",
"col4_1": "val8_31",
"col4_1": "val8_41",
"col4_1": "val8_51",
"col4_1": "val8_61"
}...it can go upto 10 child
]
}
}
}
df3.count() --> 90 millions
I am writing this into JSON as below:
df3.select("data", "newCol2.newCol1.col1", "newCol2.newCol1.col2") \
.coalesce(1) \
.write \
.partitionBy("col1","col2") \
.mode("overwrite") \
.format("json") \
.save("abc/json_data/")
NOTE: The partition column "col1" is having around 35 Thousand distinct values. The partition column "col2" can be present in multiple col1 partitions. The total volumn of data is around 350 GB.
This is running for days and after 3 days and 8 hrs, I have cancelled it.
PLATFORM: AZURE DATABRICKS.
It's hard to tell the specific bottlenecks of your job without looking into the spark UI and your Databricks' environment details.
But, following are few suggestions from my end to reduce your job execution time:
repartition
for theemp_df
andmem_df
before joining them to see if it improves thejoin
performance. And, also consider broadcasting the smaller dataframe but, it may not be of much help if both dataframes are large.df.count()
when it is not necessary to reduce resource usage when not required.orderBy
if it is not needed.Try tinkering with spark properties and apply optimizations until your execution time gets down to a satisfactory value.
I'll also recommend you check out the official documentation targetted towards optimizations and performance tuning: