Idiomatic way to simulate a multi-node servicing queue without a loop with Polars?

121 views Asked by At

I am simulating a first-in, first-out servicing queue with a parameterized number of servicing nodes. I'd like to create a user-defined function or Polars expression (in Python or as a Rust plugin is OK) that computes simulated begin and end time for each object's servicing. Is there an idiomatic approach using Polars expressions or a most optimal recommended approach?

Each row of my polars DataFrame corresponds to an object arriving at the queue at a specific time, requiring a specific minutes of servicing time once its servicing time begins. My input dataframe looks like this:

import polars as pl
df = pl.DataFrame({
    'id': [1,2,3,4],
    'servicing_time_requirement': [30, 5, 30, 5],
    'arrival_time': [0, 15, 16, 17],
})
df
shape: (5, 3)
┌─────┬────────────────────────────┬──────────────┐
│ id  ┆ servicing_time_requirement ┆ arrival_time │
│ --- ┆ ---                        ┆ ---          │
│ i64 ┆ i64                        ┆ i64          │
╞═════╪════════════════════════════╪══════════════╡
│ 1   ┆ 30                         ┆ 0            │
│ 2   ┆ 5                          ┆ 15           │
│ 3   ┆ 30                         ┆ 16           │
│ 4   ┆ 5                          ┆ 17           │
└─────┴────────────────────────────┴──────────────┘

For example, if number of servicing nodes = 5 (up to five objects can be processed at once), the desired output would be:

df.pipe(simulateQueue, nodes=5)
shape: (4, 5)
┌─────┬────────────────────────────┬──────────────┬──────────────────────┬────────────────────┐
│ id  ┆ servicing_time_requirement ┆ arrival_time ┆ servicing_start_time ┆ servicing_end_time │
│ --- ┆ ---                        ┆ ---          ┆ ---                  ┆ ---                │
│ i64 ┆ i64                        ┆ i64          ┆ i64                  ┆ i64                │
╞═════╪════════════════════════════╪══════════════╪══════════════════════╪════════════════════╡
│ 1   ┆ 30                         ┆ 0            ┆ 0                    ┆ 30                 │
│ 2   ┆ 5                          ┆ 15           ┆ 15                   ┆ 20                 │
│ 3   ┆ 30                         ┆ 16           ┆ 16                   ┆ 46                 │
│ 4   ┆ 5                          ┆ 17           ┆ 17                   ┆ 22                 │
└─────┴────────────────────────────┴──────────────┴──────────────────────┴────────────────────┘

But if number of servicing nodes = 2 (only two objects processed at once), waiting is required in this example, so the desired output would be:

df.pipe(simulateQueue, nodes=2)
shape: (4, 5)
┌─────┬────────────────────────────┬──────────────┬──────────────────────┬────────────────────┐
│ id  ┆ servicing_time_requirement ┆ arrival_time ┆ servicing_start_time ┆ servicing_end_time │
│ --- ┆ ---                        ┆ ---          ┆ ---                  ┆ ---                │
│ i64 ┆ i64                        ┆ i64          ┆ i64                  ┆ i64                │
╞═════╪════════════════════════════╪══════════════╪══════════════════════╪════════════════════╡
│ 1   ┆ 30                         ┆ 0            ┆ 0                    ┆ 30                 │
│ 2   ┆ 5                          ┆ 15           ┆ 15                   ┆ 20                 │
│ 3   ┆ 30                         ┆ 16           ┆ 20                   ┆ 50                 │
│ 4   ┆ 5                          ┆ 17           ┆ 30                   ┆ 35                 │
└─────┴────────────────────────────┴──────────────┴──────────────────────┴────────────────────┘

I currently loop over rows of df, computing servicing_start_time using prior rows' values of servicing_end_time, but it seems like there must be a more efficient way. Polars rolling expressions are one idea that occurred to me, but they don't seem to quite fit this use case.

Current implementation in Python:

nodes = 2
arrival_times = df.get_column("arrival_times")
servicing_end_times = df.get_column("servicing_end_time")
servicing_time_requirements = df.get_column("servicing_time_requirement")
for i in range(0, servicing_end_times.len()):
    if servicing_end_times[i] is not None: continue
    next_done = servicing_end_times.filter(
        (servicing_end_times.is_not_null()) & 
        (servicing_end_times.rank(method='ordinal', descending = True).eq(nodes)))
    if next_done.len() == 0: next_done = arrival_times[i]
    else: next_done = max(next_done[0], arrival_times[i])
    servicing_end_times[i] = next_done + servicing_time_requirements[i]
1

There are 1 answers

0
Dean MacGregor On BEST ANSWER

Your algorithm requires a logic check at every step which doesn't really translate to polars methods. You could maybe/probably do it with cumulative_eval but it notes that it could be O(n^2). Instead you can use numba to compile a ufunc which will be very fast.

It is a bit cumbersome to run a function that requires two (or more) vector inputs but it is definitely doable and you can make a helper expression so it's easier to use. Broadly speaking, you have to put those inputs in a struct and then use the struct as the input. When creating the struct, the helper function aliases the fields, since using them requires having their names.

The first thing is to create the ufunc with numba's guvectorize decorator, something like this:

@nb.guvectorize([(nb.int64[:], nb.int64[:], nb.int64, nb.int64[:])], '(n),(n),()->(n)', nopython=True)
def effective_start(dur, arrival, num_workers, res):
    workers = np.zeros(num_workers,dtype=np.int64)
    for i in range(len(dur)):
        next_avail = np.min(workers)
        next_worker = np.where(workers==next_avail)[0][0]
        res[i]=max(arrival[i], next_avail)
        workers[next_worker]=res[i]+dur[i]

This next step is optional but it creates an expression wrapper for the ufunc that you can use inside a with_columns or select

def eff_start(dur, arrival, num_workers):
    if isinstance(dur, str):
        dur=pl.col(dur)       
    if isinstance(arrival, str):
       arrival=pl.col(arrival)
    return (
        pl.struct(dur.alias('___dur'), arrival.alias('___arrival'))
            .map_batches(
                lambda x, num_workers = num_workers: (
                effective_start(
                    x.struct.field('___dur'),
                    x.struct.field('___arrival'), 
                    num_workers)
                )
            )
    )

With those functions defined you can simply do

df.with_columns(
    z=eff_start('servicing_time_requirement','arrival_time',2)
)

or you can even use a generator to get all the possibilities at once

(df.with_columns(
    eff_start('servicing_time_requirement','arrival_time',x).alias(f"workers={x}")
    for x in range(1,df.shape[0]+1)
))
shape: (5, 8)
┌─────┬─────────────────┬──────────────┬───────────┬───────────┬───────────┬───────────┬───────────┐
│ id  ┆ servicing_time_ ┆ arrival_time ┆ workers=1 ┆ workers=2 ┆ workers=3 ┆ workers=4 ┆ workers=5 │
│ --- ┆ requirement     ┆ ---          ┆ ---       ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ i64 ┆ ---             ┆ i64          ┆ i64       ┆ i64       ┆ i64       ┆ i64       ┆ i64       │
│     ┆ i64             ┆              ┆           ┆           ┆           ┆           ┆           │
╞═════╪═════════════════╪══════════════╪═══════════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 1   ┆ 20              ┆ 0            ┆ 0         ┆ 0         ┆ 0         ┆ 0         ┆ 0         │
│ 2   ┆ 3               ┆ 15           ┆ 20        ┆ 15        ┆ 15        ┆ 15        ┆ 15        │
│ 3   ┆ 8               ┆ 16           ┆ 23        ┆ 18        ┆ 16        ┆ 16        ┆ 16        │
│ 4   ┆ 30              ┆ 25           ┆ 31        ┆ 25        ┆ 25        ┆ 25        ┆ 25        │
│ 5   ┆ 5               ┆ 30           ┆ 61        ┆ 30        ┆ 30        ┆ 30        ┆ 30        │
└─────┴─────────────────┴──────────────┴───────────┴───────────┴───────────┴───────────┴───────────┘