What would be the fastest way to convert a Redis Stream output (aioredis client/ hiredis parser) to a Pandas Dataframe where Redis Stream ID‘s timestamp and sequence number as well as values are proper type converted Pandas index columns?

Example Redis output:

[[b'1554900384437-0', [b'key', b'1']], 
[b'1554900414434-0', [b'key', b'1']]]

3 Answers

Community On Best Solutions

There seem to be two main bottlenecks here:

  1. Pandas DataFrames store their data in column-major format, meaning each column maps to one numpy array, whereas the Redis stream data is row-by-row.

  2. Pandas MultiIndex is made for categorical data, and converting raw arrays to the required levels/code structure seems to be non-optimized

Due to number 1. it is inevitable to loop over all Redis stream entries. Assuming we know the length beforehand, we can pre-allocate numpy arrays that we fill as we go along, and with some tricks reuse these arrays as the DataFrame columns. If the overhead of looping in Python is still too much, rewriting in Cython should be straightforward.

Since you didn't specify datatypes, the answer keeps everything in bytes using numpy.object arrays, it should be reasonably obvious how to adapt to a custom setting. The only reason to put all of the columns in the same array is to move an inner loop over the columns/fields from Python to C. It can be split up into e.g. one array per data type or one array per column.

from functools import partial, reduce
import numpy as np
import pandas as pd
data = [[b'1554900384437-0', [b'foo', b'1', b'bar', b'2', b'bla', b'abc']], 
[b'1554900414434-0', [b'foo', b'3', b'bar', b'4', b'bla', b'xyz']]]
colnames = data[0][1][0::2]
ncols = len(colnames)
nrows = len(data)
ts_seq = np.empty((2, nrows), dtype=np.int64)
cols = np.empty((ncols, nrows), dtype=np.object)

for i,(id,fields) in enumerate(data):
    ts, seq = id.split(b"-", 2)
    ts_seq[:, i] = (int(ts), int(seq))
    cols[:, i] = fields[1::2]

colframes = [pd.DataFrame(cols[i:i+1, :].T) for i in range(ncols)]
merge = partial(pd.merge, left_index=True, right_index=True, copy=False)
df = reduce(merge, colframes[1:], colframes[0])
df.columns = colnames

For number 2. we can use numpy.unique to create the levels/codes structure needed by Pandas MultiIndex. From the documentation it seems that numpy.unique also sorts the data. Since our data is presumably already sorted, a possible future optimisation would be to try to skip the sorting step.

ts = ts_seq[0, :]
seq = ts_seq[1, :]
maxseq = np.max(seq)
ts_levels, ts_codes = np.unique(ts, return_inverse=True)
seq_levels = np.arange(maxseq+1)
seq_codes = seq
df.index = pd.MultiIndex(levels=[ts_levels, seq_levels], codes=[ts_codes, seq_codes], names=["Timestamp", "Seq"])

Finally, we can verify that there was no copying involved by doing

cols[0, 0] = b'79'

and checking that the entries in df do indeed change.

Ali Hallaji On

you can use this:

Paul Kovtun On

The quickest way is to process data using batches

  1. IO in batches of N msgs (i.e. 100 messages per batch)

  2. Convert this batch into 1 Dataframe (using pd.DataFrame([]))

  3. Apply lambda or convertation function to timestamp column converted to numpy (.values). a-la:

    df['time'] = [datetime.fromtimestamp(t.split('-')[0]) for t in df['time'].values]