Dask Dataframe GroupBy.size() returns memoryError

191 views Asked by At

I have two large CSV files ~28 million rows each. I am performing an inner join, adding columns against the new Dask Dataframe, then requesting a GroupBy.Size() on certain columns to return a count. In the example the inputs are coming from two parquet files, which have been generated from the original CSVs.

The end to end program does work on a 8 Core / 32GB Ram Computer and produces a 4x6 Pandas DF of the groupBy Size, but when running on 16GB and 10GB RAM devices, I get a memory error.

What can I do to avoid this memory error?

Here is the code in question:

def merge(ubs_dd, br_dd):
    return dd.merge(ubs_dd, br_dd, left_on='mabid', right_on='brid', how='inner', suffixes=('_ubs', '_br'),)  # slow
    #return dd.merge(ubs_dd, br_dd, left_index=True, right_index=True)  # fast

def reconcile(merged_dd):
    merged_dd['amount_different'] = merged_dd['AMOUNT_ubs'].astype(float) - merged_dd['AMOUNT_br'].astype(float)
    merged_dd['amount_break'] = merged_dd['amount_different'].abs() >= 1 #+/- $1 tolerance
    merged_dd['billable_break'] = merged_dd['BILLABLE_ubs'] == merged_dd['BILLABLE_br']
    merged_dd['eligible_break'] = merged_dd['ELIGIBLE_ubs'] == merged_dd['ELIGIBLE_br']

    return merged_dd

def metrics_report(merged_dd):
    return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()


merged_dd = merge(ubs_dd, br_dd)
merged_dd = reconcile(merged_dd)
metrics = metrics_report(merged_dd)

When running on low memory devices, here is the error I receive after 70% complete:

generating final outputs
[############################            ] | 70% Completed | 29min 19.5s
Traceback (most recent call last):
  File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 155, in <module>
    metrics = metrics_report(merged_dd)
  File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 115, in metrics_report
    return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 316, in reraise
    raise exc
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\dataframe\shuffle.py", line 780, in collect
    res = p.get(part)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 73, in get
    return self.get([keys], **kwargs)[0]
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 79, in get
    return self._get(keys, **kwargs)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\encode.py", line 28, in _get
    raw = self.partd._get(keys, **kwargs)
  File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\buffer.py", line 54, in _get
    self.slow.get(keys, lock=False)))
MemoryError
0

There are 0 answers