I am new to Dask and am looking to find a way to flatten a dictionary column in a PANDAS dataframe. Here is a screenshot of the first row of a 16 million-row dataframe:
And here is a sample of the text from three rows:
{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}
I would normally flatten the Form990PartVIISectionAGrp column with the following code:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)
I'm looking to do this in Dask but am getting the following error: "ValueError: The columns in the computed data do not match the columns in the provided metadata."
I am using Python 2.7. I import the relevant packages
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
To test the code, I created a random sample of the data:
dfs = df.sample(1000)
And then generate the Dask dataframe:
ddf = dd.from_pandas(dfs, npartitions=nCores)
The column is currently in string format so I convert it to a dictionary. Normally, I would just write one line of code:
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
But I've instead tried to do it here in a more 'Dask-like' form so I write the following function and then apply it:
def make_dict(dfs):
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
return dfs
ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()
This works -- it returns a PANDAS dataframe where the Form990PartVIISectionAGrp column is in dictionary format (it's not any faster than the non-Dask apply, however).
I then re-create the Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
And write a function to flatten the column:
def flatten(ddf_out):
ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
#ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
return ddf_out
If I then run this code:
result = ddf.map_partitions(flatten)
I get the following output, where the column has not been flattened:
I was also getting errors about missing meta data, and given that the above did not help parse the dictionary column, so I created a list of the columns produced by the plain Python flattening of the columns and used that to create a dictionary of the columns and data types:
metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}
I then apply the flatten function with this meta data:
result = ddf.map_partitions(flatten, meta=metadir)
I get the following output as result:
Running result.columns yields this:
Where this fails is in running compute(), where I get the following error message: "ValueError: The columns in the computed data do not match the columns in the provided metadata." I get the same error whether I write:
result.compute()
or
result.compute(meta=metadir)
I'm not sure what I'm doing wrong here. The columns in result seem to match those in metadir. Any suggestions would be greatly appreciated.
UPDATE: Here is my stab at updating the flatten function.
meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER'], dtype="O")
def flatten(ddf_out):
ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
for m in meta:
if m not in ddf_out:
df[m] = ''
return ddf_out
Then I run:
result = ddf.map_partitions(flatten, meta=meta).compute()
A couple of notes to get started:
Wouldn't this be better as
map
?ddf_out
was already a dask dataframe, I don't know why you had to do this.The value of
result.columns
is taken from the meta you provided, no computing happens until you ask for it (dask is lazy in most operations). Does the ValueError exception not provide further information?Here is a full example
How did I know what
meta
to use? I applied the function to the pandas dataframe - you might use a small piece of the dataframe to do this.Some additional notes:
dd.read_csv
, and aggregating or writing with dask functions too. Onlycompute()
on something that will be small or not return anything (because it involves writing the output). The official examples do not use from_pandas.