I am new to Dask and am looking to find a way to flatten a dictionary column in a PANDAS dataframe. Here is a screenshot of the first row of a 16 million-row dataframe:

screenshot of first two rows of data

And here is a sample of the text from three rows:

{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}

I would normally flatten the Form990PartVIISectionAGrp column with the following code:

    df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)

I'm looking to do this in Dask but am getting the following error: "ValueError: The columns in the computed data do not match the columns in the provided metadata."

I am using Python 2.7. I import the relevant packages

    from dask import dataframe as dd
    from dask.multiprocessing import get
    from multiprocessing import cpu_count
    nCores = cpu_count()

To test the code, I created a random sample of the data:

    dfs = df.sample(1000)

And then generate the Dask dataframe:

    ddf = dd.from_pandas(dfs, npartitions=nCores)

The column is currently in string format so I convert it to a dictionary. Normally, I would just write one line of code:

dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval) 

But I've instead tried to do it here in a more 'Dask-like' form so I write the following function and then apply it:

    def make_dict(dfs):
        dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)   
        return dfs
    ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()

This works -- it returns a PANDAS dataframe where the Form990PartVIISectionAGrp column is in dictionary format (it's not any faster than the non-Dask apply, however).

ddf_out

I then re-create the Dask DF:

    ddf = dd.from_pandas(ddf_out, npartitions=nCores)

And write a function to flatten the column:

    def flatten(ddf_out):
        ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        #ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
    return ddf_out

If I then run this code:

    result = ddf.map_partitions(flatten)

I get the following output, where the column has not been flattened:

result

I was also getting errors about missing meta data, and given that the above did not help parse the dictionary column, so I created a list of the columns produced by the plain Python flattening of the columns and used that to create a dictionary of the columns and data types:

metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
       u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
       u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
       u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
       u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
       u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
       u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}

I then apply the flatten function with this meta data:

    result = ddf.map_partitions(flatten, meta=metadir)

I get the following output as result:

result

Running result.columns yields this:

result.columns

Where this fails is in running compute(), where I get the following error message: "ValueError: The columns in the computed data do not match the columns in the provided metadata." I get the same error whether I write:

result.compute()

or

result.compute(meta=metadir)

I'm not sure what I'm doing wrong here. The columns in result seem to match those in metadir. Any suggestions would be greatly appreciated.

UPDATE: Here is my stab at updating the flatten function.

    meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
     'F9_07_PZ_DIRTRSTKEY_NAME',
     'F9_07_PZ_COMP_OTHER',
     'F9_07_PZ_COMP_RELATED',
     'F9_07_PZ_TITLE',
     'F9_07_PZ_AVE_HOURS_WEEK',
     'F9_07_PZ_COMP_DIRECT',
     'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
     'F9_07_PC_OFFICER',
     'F9_07_PC_HIGH_COMP_EMPLOYEE',
     'BusinessName',
     'F9_07_PC_KEY_EMPLOYEE',
     'F9_07_PC_TRUSTEE_INSTITUTIONAL',
     'NameBusiness',
     'F9_07_PC_FORMER'], dtype="O")

    def flatten(ddf_out):
        ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        for m in meta:
            if m not in ddf_out:
                df[m] = '' 
        return ddf_out

Then I run:

result = ddf.map_partitions(flatten, meta=meta).compute()

2 Answers

2
mdurant On Best Solutions

A couple of notes to get started:

.apply(literal_eval)

Wouldn't this be better as map?

I then re-create the Dask DF:

ddf = dd.from_pandas(ddf_out, npartitions=nCores)

ddf_out was already a dask dataframe, I don't know why you had to do this.

The columns in result seem to match those in metadir.

The value of result.columns is taken from the meta you provided, no computing happens until you ask for it (dask is lazy in most operations). Does the ValueError exception not provide further information?

Here is a full example

x = ({'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'CHAIR PERSON',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
 {'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'VICE CHAIR',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT', 
       'F9_07_PZ_DIRTRSTKEY_NAME',
       'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
       'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()

How did I know what meta to use? I applied the function to the pandas dataframe - you might use a small piece of the dataframe to do this.

Some additional notes:

  • it is an anti-pattern to load the data with pandas, pass to dask workers and then gather the whole result back to a pandas (in memory) dataframe, you are very unlikely to see a speedup this way and may incur a lot of overhead. You are better off loading with something like dd.read_csv, and aggregating or writing with dask functions too. Only compute() on something that will be small or not return anything (because it involves writing the output). The official examples do not use from_pandas.
  • string and dict processing are python methods, and so hold the interpreter lock (GIL) of any python function: threads will not actually run in parallel. To get parallelism, you need to run in processes, which is most easily achieved using https://docs.dask.org/en/latest/setup/single-distributed.html
  • the distributed scheduler also gives you access to the dashboard, which has a lot of useful information to diagnose how your system is running. You can also configure a lot about its behaviour, in case you have firewall rules you need to obey.
0
Gregory Saxton On

Given a small- or medium-sized dataset, the plain PANDAS solution would work:

df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)

However, with 16 million rows the PANDAS solution won't run on either a Macbook with 16GB of RAM or a Windows machine with 96GB. For that reason I looked to Dask. As seen in the above answer and comments, however, the Dask solution does not work because each observation in my dataset does not necessarily have all of the dictionary keys. Collectively, the 16 million observations of Form990PartVIISectionAGrp have the 15 keys in the following list:

  newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
 'F9_07_PZ_DIRTRSTKEY_NAME',
 'F9_07_PZ_COMP_OTHER',
 'F9_07_PZ_COMP_RELATED',
 'F9_07_PZ_TITLE',
 'F9_07_PZ_AVE_HOURS_WEEK',
 'F9_07_PZ_COMP_DIRECT',
 'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
 'F9_07_PC_OFFICER',
 'F9_07_PC_HIGH_COMP_EMPLOYEE',
 'BusinessName',
 'F9_07_PC_KEY_EMPLOYEE',
 'F9_07_PC_TRUSTEE_INSTITUTIONAL',
 'NameBusiness',
 'F9_07_PC_FORMER']

So, my solution involved taking some of the hints provided by @mdurant above and first adding any missing keys to each row:

for index, row in df[:].iterrows():
    for k in newkeys:
        row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)

That took 100 minutes on my Macbook. Based on mdurant's comment, I then saved the dataframe in JSON format:

df.to_json('df.json', orient='records', lines=True)

And read the file into Dask as text:

import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)

Then create a function to flatten the column:

def flatten(record):
    return {
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
    'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
    'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
    'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
    'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],  
    'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],  
    'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
    'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
    'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
    'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
    'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
    'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
    'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
    'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
    'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
    'URL': record['URL'],
}

I can then apply the function:

df = b.map(flatten).to_dataframe()

And export the data to CSV:

df.to_csv('compensation*.csv')

This works like a charm! In short, based on mdurant's helpful comments above, the keys are 1) to add missing keys to all observations and 2) to not read your data into Dask from PANDAS (use text or CSV instead). Taking care of those two issues led to a good solution to this problem.