I have a 58 GB dataframe with one column called "value" created from reading a unstructured .sql file which is basically a MySql log dump file. It looks like below.

Please note the column "value" is only shown partially as required for the question. The columns row_idx , DDL_IND_TEMP & DDL_IND are created by me in order to achieve my requirement, which is given below.

+--------------------+-------+------------+-------+
|               value|row_idx|DDL_IND_TEMP|DDL_IND|
+--------------------+-------+------------+-------+
|                    |      1|        NULL|   NULL|
|-- MySQL dump 10....|      2|        NULL|   NULL|
|                  --|      3|        NULL|   NULL|
|-- Host: 192.168....|      4|        NULL|   NULL|
|-- --------------...|      5|        NULL|   NULL|
|-- Server version...|      6|        NULL|   NULL|
|                    |      7|        NULL|   NULL|
|/*!40111 SET @OLD...|     17|        NULL|   NULL|
|                    |     18|        NULL|   NULL|
|-- Table structur...|     22|        NULL|   NULL|
|                  --|     23|        NULL|   NULL|
|                    |     24|        NULL|   NULL|
|DROP TABLE IF EXI...|     25|        NULL|   NULL|
|/*!40101 SET @sav...|     26|        NULL|   NULL|
|/*!40101 SET char...|     27|        NULL|   NULL|
|                    |     28|        NULL|   NULL|
|CREATE TABLE `use...|     29|           1|      1|
|  `id` int(11) NO...|     30|        NULL|      1|
|  `number` varcha...|     31|        NULL|      1|
|  `content` varch...|     32|        NULL|      1|
|  `time` datetime...|     33|        NULL|      1|
|   PRIMARY KEY (`...|     34|        NULL|      1|
|) ENGINE=InnoDB A...|     35|        NULL|      1|
|/*!40101 SET char...|     36|        NULL|      1|
|                    |     37|        NULL|      1|
|                  --|     38|        NULL|      1|
|-- Dumping data f...|     39|           0|      0|
|                  --|     40|        NULL|      0|
|-- Table structur...|     49|        NULL|      0|
|                  --|     50|        NULL|      0|
|                    |     51|        NULL|      0|
|DROP TABLE IF EXI...|     52|        NULL|      0|
|/*!40101 SET @sav...|     53|        NULL|      0|
|/*!40101 SET char...|     54|        NULL|      0|
|                    |     55|        NULL|      0|
|CREATE TABLE `aff...|     56|           1|      1|
|  `id` int(11) NO...|     57|        NULL|      1|
|  `number` varcha...|     58|        NULL|      1|
|  `content` varch...|     59|        NULL|      1|
|  `time` datetime...|     60|        NULL|      1|
|   PRIMARY KEY (`...|     61|        NULL|      1|
|) ENGINE=InnoDB A...|     62|        NULL|      1|
|/*!40101 SET char...|     63|        NULL|      1|
|                    |     64|        NULL|      1|
|                  --|     65|        NULL|      1|
|-- Dumping data f...|     66|           0|      0|
|                  --|     67|        NULL|      0|
|                    |     68|        NULL|      0|
|                    |     84|        NULL|      0|
|-- Dump completed...|     85|        NULL|      0|
+--------------------+-------+------------+-------+

My requirement is to extract the DDL portions that occur through out the .sql file into a separate text file.

The solution, I chose is to create a column called "DDL_IND" that marks all the records from the record starting with "CREATE TABLE" string until the record that starts with "-- Dumping data". I have achieved that using the below code.

data_source = "s3-path-for-the-58GB-file.sql"

df_userlogs = spark.read.text(data_source)

from pyspark.sql.types import StringType,ArrayType,StructType,StructField
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.orderBy("row_idx")

df_userlogs1 =     df_userlogs.withColumn("row_idx",F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

df_userlogs2 = df_userlogs1.withColumn("DDL_IND_TEMP", F.when(F.col("value").startswith("CREATE TABLE") , F.lit(1))
                                                    .when( F.col("value").startswith("-- Dumping data"), F.lit(0))
                                                    .otherwise(F.lit(None))) \
                       .withColumn("DDL_IND",F.when(F.col("value").startswith("CREATE TABLE") , F.lit(1))
                                              .when(F.col("value").startswith("-- Dumping data") , F.lit(0))
                                              .otherwise(F.last("DDL_IND_TEMP",ignorenulls = True).over(w)))`

This piece of code runs fine until around 25 GB file but ran into DiskSpill issue during the sort that is required for window for 58 GB file. As there is no column to partition, this is a full file sort it is not able to cope up and issuing below error

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

24/01/23 17:44:58 ERROR TaskMemoryManager: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@385cbc2a
java.io.IOException: No space left on device

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

I am currently testing this on EMR serverless on AWS with application limits as 400 vCPUs, 3000 GB mem, 20 TB GB disk on total which I believe are way high for processing 58 GB file. I am sure I am missing a trick here so its blowing up to the application limits.

What is the best solution for this?

0

There are 0 answers