I receive messages from an observable. These are sometimes not in chronological order, but their value includes a timestamp that can be used for sorting.
So, we do not know when the stream will end, we know that two entries can be in the wrong order, and we want to deliver the elements in the chronological order. A delay of a second or more is acceptable.
Now, to order the items, i want to buffer and order the elements.
I would need to keep about three entries, sort them and release the first, until a few miliseconds have passed or a new item arrives.
So there might be this stream:
//Time item appears: example item:
14:01:01 {time: '14:00:00', name: 'olga'}
14:01:02 {time: '14:00:03', name: 'peter'}
14:01:03 {time: '14:00:02', name: 'ouma'}
14:01:05 {time: '14:00:06', name: 'kat'}
14:01:06 {time: '14:00:05', name: 'anne'}
//... more to come
And what i need is:
//Time item appears: example item:
14:01:05 {time: '14:00:00', name: 'olga'}
14:01:06 {time: '14:00:02', name: 'ouma'}
14:01:07 {time: '14:00:03', name: 'peter'}
14:01:08 {time: '14:00:05', name: 'anne'}
14:01:09 {time: '14:00:06', name: 'kat'}
// ... more to come
So in the example, i would wait for three elements, sort them and release the first (in the example olga).
Then, after a few miliseconds, kat arrives and would have to be added to the array to be sorted, release the first element (ouma) and then again waiting for a certain time to pass or a new element to arrive.
There is the buffer and bufferCount operator but i can't wrap my head arround how to use those to achieve this, or if there is a more suitable operator maybe.
The only solution i could think of is
- storing the observable results in an array.
- using setInterval as a
delayto regularly sort and release the oldest three elements. - have flag and counters to determine if nothing happended for a while to then send the remaining sorted elements and terminate the interval
- (re)trigger the interval with every new element, if it is not already set
Here is the Solution i came up with: https://stackblitz.com/edit/typescript-xtjuu8?file=index.ts
Is there a better way to do this?
This was a fun one!
I played a bit with the
bufferoperator, at first:If
streamOrdersyields events according to the following table:{ order: 1, name: 'olga' }{ order: 3, name: 'peter' }{ order: 2, name: 'ouma' }{ order: 5, name: 'kat' }{ order: 4, name: 'anne' }... then here is what you get instead:
{ order: 1, name: 'olga' }{ order: 2, name: 'ouma' }{ order: 3, name: 'peter' }{ order: 4, name: 'anne' }{ order: 5, name: 'kat' }It works by buffering all events until the one that's expected to happen next, then merely sorting and merging in the buffer.
It's pretty neat and gets the job done if we go by your sample input!
Sadly, I realised that if you instead receive an order quite earlier than you should, it would still be emitted out of order: consider the case where you get:
[1, 10, 3, 4, 5, 2]... you'd end up with[1, 2, 3, 4, 5, 10]; receiving the1right away, and the 5 order values, including10"all at once" when2arrives.To circumvent this issue, I had to take another approach, using what's essentially a companion
Subject<number>to delay each order received until its number is up.See it in action here on StackBlitz.com