I´ve been migrating some of my codes from pandas do dask. Many of them are already working. Studing a little bit more about dask, I´ve faced it "Client()" feature for monitoring usage. Nice function! But it does break my code!
Dask without client
import dask.dataframe as dd
from dask.distributed import Client
from datetime import date, timedelta, datetime
def carregar ():
df = dd.read_parquet(path=f'{pastafonte}\\Unificado', parse_dates=['data'])
df['parceiro'] = df.parceiro.cat.add_categories('Not_available').fillna('Not_available')
df['mci'] = df['mci'].fillna(0)
df['sku'] = df['sku'].fillna(df['marca'].astype(str))
df['cod_transacao'] = df['cod_transacao'].fillna('Not_available')
return df
dftotal = carregar()
gerado = dftotal.loc[(dftotal.parceiro != 'Amazon') | ((dftotal.parceiro == 'Amazon') & (dftotal.status == 'indefinido'))]
df = dftotal.groupby(['produto','parceiro', 'marca',dftotal.data.dt.to_period("M"), 'status'], dropna=False, observed=True).aggregate({'gmv': 'sum', 'receita': 'sum', 'cashback': 'sum'}).reset_index()
dfafiliados = dftotal.loc[(dftotal.produto == 'Afiliados')]
dfafiliados = dfafiliados.groupby(['produto','parceiro', 'marca',dfafiliados.data.dt.to_period("M"), 'status'], dropna=False, observed=True)['cod_transacao'].nunique().reset_index()
dfafiliados = dfafiliados.rename(columns={ 'cod_transacao' : 'qtde_vendas'})
dfafiliados = dfafiliados.loc[dfafiliados.qtde_vendas != 0] #Para substituir o "observed=True" que não está funcionando
dfoutros = dftotal.loc[(dftotal.produto != 'Afiliados')]
dfoutros.cod_transacao = dfoutros.cod_transacao.fillna('Não disponível')
dfoutros = dfoutros.groupby(['produto','parceiro', 'marca',dfoutros.data.dt.to_period("M"), 'status'], dropna=False, observed=True).aggregate({'cod_transacao' :'count'}).reset_index()
dfoutros = dfoutros.rename(columns={ 'cod_transacao' : 'qtde_vendas'})
dfnunique = dd.concat([dfafiliados,dfoutros], axis=0)#, ignore_order = True
df = df.merge(dfnunique,how='left', on=['produto','parceiro', 'marca','data', 'status'])
df['data'] = df['data'].astype({'data' : 'datetime64[ns]'}) # type: ignore
df = df[['produto','parceiro','marca','data','status','qtde_vendas','gmv','receita','cashback']]
df = df.compute()
df = df.sort_values( by = ['produto','parceiro', 'marca','data','status'])
And it all runs ok in about 30 seconds.
But if it run the same code, excepct for adding the client, it runs out of memory
Client = Client()
Client
2023-10-24 23:22:14,645 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52395 (pid=17668) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:15,090 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:15,363 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52401 (pid=4388) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:15,584 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:19,561 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52402 (pid=26016) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:19,778 - distributed.nanny - WARNING - Restarting worker
2023-10-24 23:22:21,731 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:52398 (pid=18488) exceeded 95% memory budget. Restarting...
2023-10-24 23:22:22,074 - distributed.nanny - WARNING - Restarting worker
I´ve tried to create a sample dataframe to reproduce such error but I couldnt (I dont know I would need to test using a much larger sample), but even using a smaller dataframe, I can see a warning using client that does not happen using without client.
Without client
from datetime import datetime
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np
num_variables = 1_000_000
rng = np.random.default_rng()
data = pd.DataFrame({
'id' : np.random.randint(1,99999,num_variables),
'date' : [np.random.choice(pd.date_range(datetime(2021,1,1),datetime(2022,12,31))) for i in range(num_variables)],
'product' : [np.random.choice(['giftcards', 'afiliates']) for i in range(num_variables)],
'brand' : [np.random.choice(['brand_1', 'brand_2', 'brand_4', 'brand_6', np.nan]) for i in range(num_variables)],
'gmv' : rng.random(num_variables) * 100,
'revenue' : rng.random(num_variables) * 100})
data = data.astype({'product': 'category', 'brand':'category'})
ddf = dd.from_pandas(data, npartitions=5)
df = ddf.groupby([ddf.date.dt.to_period('M'), 'product','brand'], dropna=False, observed=True).aggregate({'id' : 'count'}).reset_index()
df = df.compute()
Runs in 0.0 seconds!
Now with client
from datetime import datetime
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np
Client = Client()
Client
num_variables = 1_000_000
rng = np.random.default_rng()
data = pd.DataFrame({
'id' : np.random.randint(1,99999,num_variables),
'date' : [np.random.choice(pd.date_range(datetime(2021,1,1),datetime(2022,12,31))) for i in range(num_variables)],
'product' : [np.random.choice(['giftcards', 'afiliates']) for i in range(num_variables)],
'brand' : [np.random.choice(['brand_1', 'brand_2', 'brand_4', 'brand_6', np.nan]) for i in range(num_variables)],
'gmv' : rng.random(num_variables) * 100,
'revenue' : rng.random(num_variables) * 100})
data = data.astype({'product': 'category', 'brand':'category'})
ddf = dd.from_pandas(data, npartitions=5)
df = ddf.groupby([ddf.date.dt.to_period('M'), 'product','brand'], dropna=False, observed=True).aggregate({'id' : 'count'}).reset_index()
df = df.compute()
UserWarning: Sending large graph of size 28.62 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
Thanks in advance!