I have two large CSV files ~28 million rows each. I am performing an inner join, adding columns against the new Dask Dataframe, then requesting a GroupBy.Size()
on certain columns to return a count. In the example the inputs are coming from two parquet files, which have been generated from the original CSVs.
The end to end program does work on a 8 Core / 32GB Ram Computer and produces a 4x6 Pandas DF of the groupBy Size, but when running on 16GB and 10GB RAM devices, I get a memory error.
What can I do to avoid this memory error?
Here is the code in question:
def merge(ubs_dd, br_dd):
return dd.merge(ubs_dd, br_dd, left_on='mabid', right_on='brid', how='inner', suffixes=('_ubs', '_br'),) # slow
#return dd.merge(ubs_dd, br_dd, left_index=True, right_index=True) # fast
def reconcile(merged_dd):
merged_dd['amount_different'] = merged_dd['AMOUNT_ubs'].astype(float) - merged_dd['AMOUNT_br'].astype(float)
merged_dd['amount_break'] = merged_dd['amount_different'].abs() >= 1 #+/- $1 tolerance
merged_dd['billable_break'] = merged_dd['BILLABLE_ubs'] == merged_dd['BILLABLE_br']
merged_dd['eligible_break'] = merged_dd['ELIGIBLE_ubs'] == merged_dd['ELIGIBLE_br']
return merged_dd
def metrics_report(merged_dd):
return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
merged_dd = merge(ubs_dd, br_dd)
merged_dd = reconcile(merged_dd)
metrics = metrics_report(merged_dd)
When running on low memory devices, here is the error I receive after 70% complete:
generating final outputs
[############################ ] | 70% Completed | 29min 19.5s
Traceback (most recent call last):
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 155, in <module>
metrics = metrics_report(merged_dd)
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 115, in metrics_report
return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\threaded.py", line 84, in get
**kwargs
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 486, in get_async
raise_exception(exc, tb)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 316, in reraise
raise exc
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 222, in execute_task
result = _execute_task(task, data)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\dataframe\shuffle.py", line 780, in collect
res = p.get(part)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 73, in get
return self.get([keys], **kwargs)[0]
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 79, in get
return self._get(keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\encode.py", line 28, in _get
raw = self.partd._get(keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\buffer.py", line 54, in _get
self.slow.get(keys, lock=False)))
MemoryError