Killed/MemoryError when creating a large dask.dataframe from delayed collection

10.4k views Asked by At

I am trying to create a dask.dataframe from a bunch of large CSV files (currently 12 files, 8-10 million lines and 50 columns each). A few of them might fit together into my system memory but all of them at once definitely will not, hence the use of dask instead of regular pandas.

Since reading each csv file involves some extra work (adding columns with data from the file path), I tried creating the dask.dataframe from a list of delayed objects, similar to this example.

This is my code:

import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd

def read_file_to_dataframe(file_path):
    df = pd.read_csv(file_path)
    df['some_extra_column'] = 'some_extra_value'
    return df

if __name__ == '__main__':
    path = '/path/to/my/files'
    delayed_collection = list()
    for rootdir, subdirs, files in os.walk(path):
        for filename in files:
            if filename.endswith('.csv'):
                file_path = os.path.join(rootdir, filename)
                delayed_reader = delayed(read_file_to_dataframe)(file_path)
                delayed_collection.append(delayed_reader)

    df = dd.from_delayed(delayed_collection)
    print(df.compute())

When starting this script (Python 3.4, dask 0.12.0), it runs for a couple of minutes while my system memory constantly fills up. When it is fully used, everything starts lagging and it runs for some more minutes, then it crashes with killed or MemoryError.

I thought the whole point of dask.dataframe was to be able to operate on larger-than-memory dataframes that span over multiple files on disk, so what am I doing wrong here?

edit: Reading the files instead with df = dd.read_csv(path + '/*.csv') seems to work fine as far as I can see. However, this does not allow me to alter each single dataframe with additional data from the file path.

edit #2: Following MRocklin's answer, I tried to read my data with dask's read_bytes() method as well as using the single-threaded scheduler as well as doing both in combination. Still, even when reading chunks of 100MB in single-threaded mode on a laptop with 8GB of memory, my process gets killed sooner or later. Running the code stated below on a bunch of small files (around 1MB each) of similar shape works fine though. Any ideas what I am doing wrong here?

import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd

def create_df_from_bytesio(bytesio):
    df = pd.read_csv(bytesio)
    return df

def create_bytesio_from_bytes(block):
    bytesio = BytesIO(block)
    return bytesio


path = '/path/to/my/files/*.csv'

sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
    for block in datafile:
        bytesio = delayed(create_bytesio_from_bytes)(block)
        df = delayed(create_df_from_bytesio)(bytesio)
        delayed_collection.append(df)

dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))
1

There are 1 answers

4
MRocklin On BEST ANSWER

If each of your files is large then a few concurrent calls to read_file_to_dataframe might be flooding memory before Dask ever gets a chance to be clever.

Dask tries to operate in low memory by running functions in an order such that it can delete intermediate results quickly. However if the results of just a few functions can fill up memory then Dask may never have a chance to delete things. For example if each of your functions produced a 2GB dataframe and if you had eight threads running at once, then your functions might produce 16GB of data before Dask's scheduling policies can kick in.

Some options

Use dask.bytes.read_bytes

The reason why read_csv works is that it chunks up large CSV files into many ~100MB blocks of bytes (see the blocksize= keyword argument). You could do this too, although it's tricky because you need to always break on an endline.

The dask.bytes.read_bytes function can help you here. It can convert a single path into a list of delayed objects, each corresponding to a byte range of that file that starts and stops cleanly on a delimiter. You would then put these bytes into an io.BytesIO (standard library) and call pandas.read_csv on that. Beware that you'll also have to handle headers and such. The docstring to that function is extensive and should provide more help.

Use a single thread

In the example above everything would be fine if we didn't have the 8x multiplier from parallelism. I suspect that if you only ran a single function at once that things would probably pipeline without ever reaching your memory limit. You can set dask to use only a single thread with the following line

dask.set_options(get=dask.async.get_sync)

Note: For Dask versions >= 0.15, you need to use dask.local.get_sync instead.

Make sure that results fit in memory (response to edit 2)

If you make a dask.dataframe and then compute it immediately

ddf = dd.read_csv(...)
df = ddf.compute()

You're loading in all of the data into a Pandas dataframe, which will eventually blow up memory. Instead it's better to operate on the Dask dataframe and only compute on small results.

# result = df.compute()  # large result fills memory
result = df.groupby(...).column.mean().compute()  # small result

Convert to a different format

CSV is a pervasive and pragmatic format, but also has some flaws. You might consider a data format like HDF5 or Parquet.