Dask DataFrame: Resample over groupby object with multiple rows

4.3k views Asked by At

I have the following dask dataframe created from Castra:

import dask.dataframe as dd

df = dd.from_castra('data.castra', columns=['user_id','ts','text'])

Yielding:

                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e

What I'm trying to do is:

  1. Group by user_id and ts
  2. Resample it over a 3-hour period
  3. In the resampling step, any merged rows should concatenate the texts

Example output:

                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e

I tried the following:

df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()

And got the following error:

TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex

I tried passing set_index('ts') in the pipe but it doesn't seem to be an attribute of Series.

Any ideas on how to achieve this?

TL;DR

If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.

I set the index (in the to_df() function) as follows:

df.set_index('ts',drop=False,inplace=True)

And have:

  with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
     batches = partition_all(batch_size, f)
     df, frames = peek(map(self.to_df, batches))
     castra = Castra(S.CASTRA, template=df, categories=categories)
     castra.extend_sequence(frames, freq='3h')

Here are the resulting dtypes:

ts                datetime64[ns]
text                      object
user_id                  float64
2

There are 2 answers

5
MRocklin On BEST ANSWER

If we can assume that each user-id group can fit in memory then I recommend using dask.dataframe to do the outer-groupby but then using pandas to do the operations within each group, something like the following.

def per_group(blk):
    return blk.groupby('ts').text.resample('3H', how='sum')

df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()

This decouples two hard things into the two different projects

  1. Shuffling all of the user-ids together into the right groups is handled by dask.dataframe
  2. Doing the complex datetime resampling within each group is handled explicitly by pandas.

Ideally dask.dataframe would write the per-group function for you automatically. At the moment dask.dataframe does not intelligently handle multi-indexes, or resampling on top of multi-column groupbys, so the automatic solution isn't yet available. Still, it's quite possible to fall back to pandas for the per-block computation while still using dask.dataframe to prepare the groups accordingly.

0
chown On

Try converting your index to a DatetimeIndex like this:

import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...