Reactive Extensions C# - How do I skip message while executing a (relatively) long-running subscription logic

131 views Asked by At

I am new to all of the Reactive programming concepts and I need to deal with the following situation -- I have a consumer logic that is relatively long-running, in a way that a couple of messages may be produced while a preceding message is still being processed.

I need to skip those messages and prevent calling the relevant consume logic for those. Here is an example (in C#)

IObservable<int> stream  = ...
stream.SubscribeOn(TaskPoolScheduler.Default).Subsribe(ProcessMessage, cancellationToken);

Imagine that a message is pushed to the stream every 5 milliseconds: (1, 2, 3, 4, 5)

The ProcessMessage method may take about 10-15ms to complete, meaning when it received message 1, it will still be working when 2 and 3 are being produced.

I need to skip calling ProcessMessage for 2 and 3 and catch up with 4 directly, or whatever the next unprocessed message will be.

Is there any built-in construct in Reactive extensions that allows me to handle this particular case?

0

There are 0 answers