How do I parse large compressed csv files in Foundry?

1.3k views Asked by At

I have a large gziped csv file (.csv.gz) uploaded to a dataset that's about 14GB in size and 40GB when uncompressed. Is there a way to decompress, read, and write it out to a dataset using Python Transforms without causing the executor to OOM?

2

There are 2 answers

0
vanhooser On BEST ANSWER

I'm going to harmonize a few tactics in answering this question.

First, I want to write this using test-driven development using the method discussed here since we are dealing with raw files. The iteration speed on raw files using full checks + build will be far too long, so I'll start off by creating a sample .csv file and compressing it for much faster development.

My sample .csv file looks like the following:

sample

I then compressed it using command-line utilities and added it to my code repository by cloning the repository to my local machine, adding the file to my development branch, and pushing the result back up into my Foundry instance.

I also made a test directory in my repository as I want to ensure my parsing logic is properly verified.

This resulted in my repository looking like the following:

repo_with_test

Protip: don't forget to modify your setup.py and build.gradle files to enable testing and specifically package up your small test file.

I also need to make my parsing logic sit outside my my_compute_function method so that its available to my test methods, so parse_gzip.py looks like the following:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.option("header", "true").csv(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs)
    return output_df


@transform(
    the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
    the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + file_status.path for file_status in input_filesystem.ls('**/*.csv.gz')]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

Consequently, my test_gzip_csv.py file looks like so:

from myproject.datasets import parse_gzip
from pkg_resources import resource_filename


def test_compressed_csv(spark_session):
    file_path = resource_filename(__name__, "test.csv.gz")
    parsed_df = parse_gzip.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"col_1", "col_2"}

It's important to see here that this methodology doesn't use the .files() call into the filesystem, it uses the .ls() method to get an iterator of file names. This is done on purpose in this case because we don't need to parse the files themselves inside executors; we simply need to use Spark's native methods of .csv to parse the compressed files using existing functionality.

GZip files are actually splittable and Spark's own methods for reading these files will be optimal instead of writing your own decompressor / .csv parser. If you were to try to decompress them and parse them manually, you would risk OOMing your job and need to throw more memory at it in order for it to succeed. At the scale you are also operating at, it's advisable to not process individual files in Python as its performance will not match that of Spark.

Note that I also use the transforms.verbs.dataframes.union_many method here to gracefully handle different files having different schemas. You can specify 'narrow', 'wide' and 'strict' options to handle cases of different schemas, refer to your product documentation on which will best fit your needs.

0
Ryan Norris On

There are 2 recommended ways to parse a dataset containing raw CSV files into a tabular dataset inside Foundry.

1. Just add a schema!

Foundry is able to natively understand datasets containing (compressed) csv files. So in some cases a parsing job is not necessary in the first place.

We just need to tell Foundry that we intend for this dataset to be interpreted as tabular data, rather than a generic bucket of files.

Simple navigate to the dataset preview page and click the "Apply a schema" button in the top right corner.

Foundry will make a best guess at the schema, but it can't always do a good job without your help, so you might get columns named untitled_column_1, or with the wrong data types etc. These can be cleaned up manually by clicking "Edit Schema" in the information pane on the left of the app, and using the schema editor dialog.

Note: this is not a robust solution for production pipelines. If new CSVs are added to the dataset which do not conform to the schema, Foundry will not know, and downstream jobs will fail to read the dataset.

2. In a transform

We can use spark to infer the schema of csv files inside a Foundry transform job.

See the official platform docs on inferring a schema for more details.

Also copying the code snippet here for posterity:

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))

Notice that this is more efficient than the currently accepted SO answer, as it passes all the file paths into spark.read.csv() at once, rather than one at a time and then performing a union.

Note: This is a more robust solution for production pipelines, since if the csv data changes, the schema will update automatically. But downstream datasets could still fail if the schema unexpectedly changes. If the data contains files with a known schema, I would recommend either turning off the inferSchema option to spark.read and passing a schema explicitly, or using a schema expectation to fail the build, or notify you if the schema changes unexpectedly.

Aside. A note on compression codecs

First up, a warning: in contrast to what is mentioned in the currently accepted answer, .csv.gz files are not splittable. Spark will only be able to process your 40GB of data sequentially on one executor node.

If you have very large single files such as this one, I would recommend using a splittable compression codec such as bzip2 or lz4. This way spark will naturally be able to process the large file in parallel chunks.

Note: this is different to parquet files using gzip compression, which are splittable. This is due to the parquet format itself being designed to be splittable regardless of the compression codec used.