I have the following dask dataframe created from Castra:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
Yielding:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
What I'm trying to do is:
- Group by
user_id
andts
- Resample it over a 3-hour period
- In the resampling step, any merged rows should concatenate the texts
Example output:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
I tried the following:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
And got the following error:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
I tried passing set_index('ts')
in the pipe but it doesn't seem to be an attribute of Series
.
Any ideas on how to achieve this?
TL;DR
If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.
I set the index (in the to_df()
function) as follows:
df.set_index('ts',drop=False,inplace=True)
And have:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
Here are the resulting dtypes:
ts datetime64[ns]
text object
user_id float64
If we can assume that each
user-id
group can fit in memory then I recommend using dask.dataframe to do the outer-groupby but then using pandas to do the operations within each group, something like the following.This decouples two hard things into the two different projects
Ideally dask.dataframe would write the per-group function for you automatically. At the moment dask.dataframe does not intelligently handle multi-indexes, or resampling on top of multi-column groupbys, so the automatic solution isn't yet available. Still, it's quite possible to fall back to pandas for the per-block computation while still using dask.dataframe to prepare the groups accordingly.