Dask DataFrame: write multiple CSV by column

48 views Asked by At

I have a Dask DataFrame with a column "file". I want to write each line of the dataframe to a CSV whose path is given by the "file" column.

For instance, on the example below, the row indices 0, 1, 2, and 4 should be written to the file a.csv ; while we will write the row 2 to b.csv ; and the rows 3 and 5 to c.csv:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({"x": [1, 2, 3, 7, 11, 2], "y": [1, 1, 2, 8, 0, 0], "file": ["a.csv", "a.csv", "b.csv", "c.csv", "a.csv", "c.csv"]})
ddf = dd.from_pandas(df, npartitions=2)

I tried two solutions that both works but are super slow and make the memory crash (even with <100MB chunks and 128GB of total RAM). The priority right now is to make it less expensive in term of memory, but if you can make it faster then it's even better!

Bad solution 1

Get each file group, and right them with a for loop. Super ugly, and super inefficient...

for file in ddf["file"].unique().compute():
    ddf[ddf["file"] == file].to_csv(file, single_file=True)

Bad solution 2

Use map_partitions and group datataframe on each partition separately.

from pathlib import Path

def _write_partition(df: pd.DataFrame, partition_info=None) -> None:
    if partition_info is not None:
        for file, group_df in df.groupby("file"):
            group_df.to_csv(
                file, mode="a", header=not Path(file).exists(), index=False
            )

ddf.map_partitions(_write_partition).compute()

This is working on small examples, but with my big dataframe (20GB), it runs for 3 hours without writing even one single line of CSV and then crashes because of memory (even with 128GB of RAM). I'm quite new to Dask so maybe I'm doing something wrong...

0

There are 0 answers