I have a Dask DataFrame with a column "file"
. I want to write each line of the dataframe to a CSV whose path is given by the "file"
column.
For instance, on the example below, the row indices 0, 1, 2, and 4 should be written to the file a.csv
; while we will write the row 2 to b.csv
; and the rows 3 and 5 to c.csv
:
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({"x": [1, 2, 3, 7, 11, 2], "y": [1, 1, 2, 8, 0, 0], "file": ["a.csv", "a.csv", "b.csv", "c.csv", "a.csv", "c.csv"]})
ddf = dd.from_pandas(df, npartitions=2)
I tried two solutions that both works but are super slow and make the memory crash (even with <100MB
chunks and 128GB
of total RAM). The priority right now is to make it less expensive in term of memory, but if you can make it faster then it's even better!
Bad solution 1
Get each file group, and right them with a for loop. Super ugly, and super inefficient...
for file in ddf["file"].unique().compute():
ddf[ddf["file"] == file].to_csv(file, single_file=True)
Bad solution 2
Use map_partitions
and group datataframe on each partition separately.
from pathlib import Path
def _write_partition(df: pd.DataFrame, partition_info=None) -> None:
if partition_info is not None:
for file, group_df in df.groupby("file"):
group_df.to_csv(
file, mode="a", header=not Path(file).exists(), index=False
)
ddf.map_partitions(_write_partition).compute()
This is working on small examples, but with my big dataframe (20GB
), it runs for 3 hours without writing even one single line of CSV and then crashes because of memory (even with 128GB
of RAM). I'm quite new to Dask so maybe I'm doing something wrong...