writing over 350 gb of data into json which is grouped by and used the collect list and partitioned with 2 columns

47 views Asked by At

Is there any other solution for the below problem to write data faster: There is no constraint on the cluster size and configurations.

df = emp_df.join(mem_df on emp_df.col1 == mem_df.col2)

df.count() --> 217 millions

    grouped_df = df.groupBy(
        "col1",
        "col2",
        "col3",
        "col4",
        "col5"
    )

    df1 = grouped_df \
        .agg(f.collect_list(f.struct("col3", "col7", "col2", "col8", "col10")).alias("items")) \
        .orderBy("col1","col2","items.col8")
    
    df2 = df1.withColumn("newCol1", f.struct(*df1.columns)).drop(*df1.columns)
    df3 = df2.withColumn("newCol2", f.struct(*df2.columns)).drop(*df2.columns)

OUTPUT:

    {
        "newCol2": {
            "newCol1":  {
                "col1": "val1",
                "col2": "val2",
                "col3": "val3",
                "col4": "val4",
                "col5": "val5",
                "col6": "val6",
                "col7": "val7",         
                "col8": [{
                    "col4_1": "val8_1",
                    "col4_1": "val8_2",
                    "col4_1": "val8_3",
                    "col4_1": "val8_4",
                    "col4_1": "val8_5",
                    "col4_1": "val8_6"
                },
                {
                    "col4_1": "val8_11",
                    "col4_1": "val8_21",
                    "col4_1": "val8_31",
                    "col4_1": "val8_41",
                    "col4_1": "val8_51",
                    "col4_1": "val8_61"
                
                }...it can go upto 10 child
                
                ]
            }
        }
    }

df3.count() --> 90 millions

I am writing this into JSON as below:

df3.select("data", "newCol2.newCol1.col1", "newCol2.newCol1.col2") \
    .coalesce(1) \
    .write \
    .partitionBy("col1","col2") \
    .mode("overwrite") \
    .format("json") \
    .save("abc/json_data/")

NOTE: The partition column "col1" is having around 35 Thousand distinct values. The partition column "col2" can be present in multiple col1 partitions. The total volumn of data is around 350 GB.

This is running for days and after 3 days and 8 hrs, I have cancelled it.

PLATFORM: AZURE DATABRICKS.

1

There are 1 answers

0
Vikas Sharma On

It's hard to tell the specific bottlenecks of your job without looking into the spark UI and your Databricks' environment details.

But, following are few suggestions from my end to reduce your job execution time:

  • Try to balance the partitions according to your data using repartition for the emp_df and mem_df before joining them to see if it improves the join performance. And, also consider broadcasting the smaller dataframe but, it may not be of much help if both dataframes are large.
  • If possible, reduce the number of columns and filter the dataframes before joining or grouping them. This will help in reducing the shuffling of your data.
  • Remove any unnecessary actions from the code like df.count() when it is not necessary to reduce resource usage when not required.
  • Cache intermediate data when necessary to avoid recomputation.
  • Avoid orderBy if it is not needed.

Try tinkering with spark properties and apply optimizations until your execution time gets down to a satisfactory value.

I'll also recommend you check out the official documentation targetted towards optimizations and performance tuning: