Dask client causing memory expection?

183 views Asked by At

I´ve been migrating some of my codes from pandas do dask. Many of them are already working. Studing a little bit more about dask, I´ve faced it "Client()" feature for monitoring usage. Nice function! But it does break my code!

Dask without client

import dask.dataframe as dd
from dask.distributed import Client
from datetime import date, timedelta, datetime

def carregar ():
    df = dd.read_parquet(path=f'{pastafonte}\\Unificado',  parse_dates=['data'])
    df['parceiro'] = df.parceiro.cat.add_categories('Not_available').fillna('Not_available') 
    df['mci'] = df['mci'].fillna(0)
    df['sku'] = df['sku'].fillna(df['marca'].astype(str))
    df['cod_transacao'] = df['cod_transacao'].fillna('Not_available')
    return df
dftotal = carregar()
gerado = dftotal.loc[(dftotal.parceiro != 'Amazon') | ((dftotal.parceiro == 'Amazon') & (dftotal.status == 'indefinido'))] 

df = dftotal.groupby(['produto','parceiro', 'marca',dftotal.data.dt.to_period("M"), 'status'], dropna=False, observed=True).aggregate({'gmv': 'sum', 'receita': 'sum', 'cashback': 'sum'}).reset_index()

dfafiliados = dftotal.loc[(dftotal.produto == 'Afiliados')]
dfafiliados = dfafiliados.groupby(['produto','parceiro', 'marca',dfafiliados.data.dt.to_period("M"), 'status'], dropna=False, observed=True)['cod_transacao'].nunique().reset_index()
dfafiliados = dfafiliados.rename(columns={ 'cod_transacao' : 'qtde_vendas'})
dfafiliados = dfafiliados.loc[dfafiliados.qtde_vendas != 0] #Para substituir o "observed=True" que não está funcionando

dfoutros = dftotal.loc[(dftotal.produto != 'Afiliados')]
dfoutros.cod_transacao = dfoutros.cod_transacao.fillna('Não disponível')
dfoutros = dfoutros.groupby(['produto','parceiro', 'marca',dfoutros.data.dt.to_period("M"), 'status'], dropna=False, observed=True).aggregate({'cod_transacao' :'count'}).reset_index()
dfoutros = dfoutros.rename(columns={ 'cod_transacao' : 'qtde_vendas'})

dfnunique = dd.concat([dfafiliados,dfoutros], axis=0)#, ignore_order = True

df = df.merge(dfnunique,how='left', on=['produto','parceiro', 'marca','data', 'status'])

df['data'] = df['data'].astype({'data' : 'datetime64[ns]'}) # type: ignore
df = df[['produto','parceiro','marca','data','status','qtde_vendas','gmv','receita','cashback']]
df = df.compute()
df = df.sort_values( by = ['produto','parceiro', 'marca','data','status'])

And it all runs ok in about 30 seconds.

But if it run the same code, excepct for adding the client, it runs out of memory

Client = Client()
Client

2023-10-24 23:22:14,645 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52395 (pid=17668) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:15,090 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:15,363 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52401 (pid=4388) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:15,584 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:19,561 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52402 (pid=26016) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:19,778 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:21,731 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52398 (pid=18488) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:22,074 - distributed.nanny - WARNING - Restarting worker

I´ve tried to create a sample dataframe to reproduce such error but I couldnt (I dont know I would need to test using a much larger sample), but even using a smaller dataframe, I can see a warning using client that does not happen using without client.

Without client

from datetime import datetime
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np

num_variables = 1_000_000
rng = np.random.default_rng()

data = pd.DataFrame({
    'id' :  np.random.randint(1,99999,num_variables),
    'date' : [np.random.choice(pd.date_range(datetime(2021,1,1),datetime(2022,12,31))) for i in range(num_variables)],
    'product' : [np.random.choice(['giftcards', 'afiliates']) for i in range(num_variables)],
    'brand' : [np.random.choice(['brand_1', 'brand_2', 'brand_4', 'brand_6', np.nan]) for i in range(num_variables)],
    'gmv' : rng.random(num_variables) * 100,
    'revenue' : rng.random(num_variables) * 100})

data = data.astype({'product': 'category', 'brand':'category'})
ddf = dd.from_pandas(data, npartitions=5)

df = ddf.groupby([ddf.date.dt.to_period('M'), 'product','brand'], dropna=False, observed=True).aggregate({'id' : 'count'}).reset_index()
df = df.compute()

Runs in 0.0 seconds!

Now with client

from datetime import datetime
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np

Client = Client()
Client

num_variables = 1_000_000
rng = np.random.default_rng()

data = pd.DataFrame({
    'id' :  np.random.randint(1,99999,num_variables),
    'date' : [np.random.choice(pd.date_range(datetime(2021,1,1),datetime(2022,12,31))) for i in range(num_variables)],
    'product' : [np.random.choice(['giftcards', 'afiliates']) for i in range(num_variables)],
    'brand' : [np.random.choice(['brand_1', 'brand_2', 'brand_4', 'brand_6', np.nan]) for i in range(num_variables)],
    'gmv' : rng.random(num_variables) * 100,
    'revenue' : rng.random(num_variables) * 100})

data = data.astype({'product': 'category', 'brand':'category'})
ddf = dd.from_pandas(data, npartitions=5)

df = ddf.groupby([ddf.date.dt.to_period('M'), 'product','brand'], dropna=False, observed=True).aggregate({'id' : 'count'}).reset_index()
df = df.compute()

UserWarning: Sending large graph of size 28.62 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(

Thanks in advance!

0

There are 0 answers