I am looking for something similar to the exhaustMap
operator from rxjs
, but RX.NET
does not seem to have such an operator.
What I need to achieve is that, upon every element of the source stream, I need to start an async
handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.
What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.
I also suspect I need to cleverly use the defer operator here?
Thank you!
Here is an implementation of the
ExhaustMap
operator. The source observable is projected to anIObservable<(Task<TResult>, int)>
, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with theDistinctUntilChanged
operator, and finally the observable is flattened with theConcat
operator.The tasks returned by the
function
are not guaranteed to be distinct. For example the methodasync Task<T> Return<T>(T result) => result;
returns always the sameTask
forresult = 1
orresult = false
. Hence the need for the incrementedId
in the above implementation, that individualizes the tasks, so that theDistinctUntilChanged
doesn't filter out tasks from separatefunction
invocations.Usage example:
Output:
Online demo.
Here is an alternative implementation of the
ExhaustMap
, where thefunction
produces anIObservable<TResult>
instead of aTask<TResult>
: