buffering and sorting items in a stream

86 views Asked by At

I receive messages from an observable. These are sometimes not in chronological order, but their value includes a timestamp that can be used for sorting.

So, we do not know when the stream will end, we know that two entries can be in the wrong order, and we want to deliver the elements in the chronological order. A delay of a second or more is acceptable.

Now, to order the items, i want to buffer and order the elements.

I would need to keep about three entries, sort them and release the first, until a few miliseconds have passed or a new item arrives.

So there might be this stream:

//Time item appears:   example item:
14:01:01               {time: '14:00:00', name: 'olga'}
14:01:02               {time: '14:00:03', name: 'peter'}
14:01:03               {time: '14:00:02', name: 'ouma'}
14:01:05               {time: '14:00:06', name: 'kat'}
14:01:06               {time: '14:00:05', name: 'anne'}
//... more to come

And what i need is:

//Time item appears:   example item:
14:01:05               {time: '14:00:00', name: 'olga'}
14:01:06               {time: '14:00:02', name: 'ouma'}
14:01:07               {time: '14:00:03', name: 'peter'}
14:01:08               {time: '14:00:05', name: 'anne'}
14:01:09               {time: '14:00:06', name: 'kat'}
// ... more to come

So in the example, i would wait for three elements, sort them and release the first (in the example olga). Then, after a few miliseconds, kat arrives and would have to be added to the array to be sorted, release the first element (ouma) and then again waiting for a certain time to pass or a new element to arrive.

There is the buffer and bufferCount operator but i can't wrap my head arround how to use those to achieve this, or if there is a more suitable operator maybe.

The only solution i could think of is

  • storing the observable results in an array.
  • using setInterval as a delay to regularly sort and release the oldest three elements.
  • have flag and counters to determine if nothing happended for a while to then send the remaining sorted elements and terminate the interval
  • (re)trigger the interval with every new element, if it is not already set

Here is the Solution i came up with: https://stackblitz.com/edit/typescript-xtjuu8?file=index.ts

Is there a better way to do this?

1

There are 1 answers

5
ccjmne On

This was a fun one!
I played a bit with the buffer operator, at first:

// assuming a stream of events like { order: number, name: string }
const event$ = streamOrders().pipe(share())

event$.pipe(
  buffer(event$.pipe(
    map(({ order }) => order),
    startWith(null),
    exhaustMap(order => event$.pipe(order === null ? take(1) : filter(({ order: latest }) => latest === order + 1), take(1))),
  )),
  mergeMap(events => events.sort(({ order: a }, { order: b }) => a - b)),
).subscribe(console.log)

If streamOrders yields events according to the following table:

Time item appears example item
14:01:01 { order: 1, name: 'olga' }
14:01:02 { order: 3, name: 'peter' }
14:01:03 { order: 2, name: 'ouma' }
14:01:05 { order: 5, name: 'kat' }
14:01:06 { order: 4, name: 'anne' }

... then here is what you get instead:

Time item appears example item
14:01:01 { order: 1, name: 'olga' }
14:01:03 { order: 2, name: 'ouma' }
14:01:03 { order: 3, name: 'peter' }
14:01:06 { order: 4, name: 'anne' }
14:01:06 { order: 5, name: 'kat' }

It works by buffering all events until the one that's expected to happen next, then merely sorting and merging in the buffer.
It's pretty neat and gets the job done if we go by your sample input!

Sadly, I realised that if you instead receive an order quite earlier than you should, it would still be emitted out of order: consider the case where you get: [1, 10, 3, 4, 5, 2]... you'd end up with [1, 2, 3, 4, 5, 10]; receiving the 1 right away, and the 5 order values, including 10 "all at once" when 2 arrives.


To circumvent this issue, I had to take another approach, using what's essentially a companion Subject<number> to delay each order received until its number is up.

const expected$ = new BehaviorSubject<number | null>(null)
const event$ = streamOrders().pipe(
  delayWhen(({ order }) => expected$.pipe(filter(next => next === null || next === order))),
  share(),
)

event$.pipe(
  map(({ order }) => order + 1), 
  observeOn(asyncScheduler)
).subscribe(expected$)

event$.subscribe(console.log)

See it in action here on StackBlitz.com