I have a situation where there is an observable and let's say 10 observers attached to it. I would like to send the message to each new observer only until an observer somehow says to observable that it recognizes the message and it will process it. At this moment I would like to stop sending this message to other observers.
In other words each observer knows how to process a particular type of message and each one will take and process the message it recognizes. Others don't need to receive it after the one that recognizes it started processing. How this situation could be implemented with the reactive extensions? I assume we need some sort of notification back to the observable but I don't see how can it be done.
This is what I came up with. Any comments, ideas, suggestions and critics is very welcome. The most interesting part is that
IObserver<TValue, TResult>
public interface already exists in the Rx library but it's used only in the Notification object. What I did, I createdIObservable<TValue, TResult>
counterpart, SelectiveSubject to take care of the logic to call observers until one of them returns true and ToSelective method extension. I'm actually surprised that it was not done in the library, at least theIObservable<TValue, TResult>
part. After all,IObserver<TValue, TResult>
existed.Now the usage is as simple as this
In the example the first subscriber takes care of every other value in the sequence and the second subscriber takes care of the rest.