How to filter an Observable based on the value of another?

2.8k views Asked by At

I'm trying to find some way of filtering an observable based on the value of another observable. For example, let's say we only want to receive events between time x and y. Can one filter an observable based on the value of the timer?

2

There are 2 answers

4
Brandon On

Several ways. Without some code it is hard to know which is best.

via CombineLatest (listen all the time and just filter based on the most recent value):

var astream = ...;
var bstream = ...;
var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
          .Where(v => v.b >= x && v.b <= y)
          .Select(v => v.a); // Alas sometimes you will get duplicate a's.

via Select and Switch (only listen to astream when bstream meets some condition):

var astream = ...;
var bstream = ...;
var filtered = bstream
    .Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
    .Switch();
0
James World On

As Brandon' mentioned, there are many ways to combine event streams.

Reactive joins with Observable.Join are a very general purpose tool, but a significant proportion of the Rx built-in library of operators combines streams in ways that can support filtering of one based on another.

I really like Brandon's Select + Switch technique (+1 from me); I've filed that away for future use!

Here is an approach that directly tackles the stated problem of filtering a source stream to a start and end time. It has some advantages over Select + Switch including:

  • It avoids checking the filter condition for every event of the source stream.
  • It avoids needing an "empty" stream.
  • It subscribes only once to the source.
  • It sends OnCompleted() as soon as the end time is reached instead of lasting as long as the source stream does.

It really boils down to a particular overload of the Observable.Window operator, but I'll explain it step by step.

The basic idea is to filter the source stream by applying a Window that opens at the start time and closes at the end time.

First let create an example source stream (xs) of one second pulses, and a start time and end time:

var xs = Observable.Interval(TimeSpan.FromSeconds(1));

var startTime = DateTime.Now + TimeSpan.FromSeconds(5);    
var endTime = DateTime.Now + TimeSpan.FromSeconds(8);

Note for brevity I'm not checking that startTime is before endTime. Now we create a stream to open the window, and a stream to close the window:

var start = Observable.Timer(startTime);
var end = Observable.Timer(endTime);

And finally filter the source stream using Observable.Window. The output of this operator is a stream of streams (IObservable<IObservable<T>>) - each sub-stream is a new window.

The overload we will use accepts a stream whose events mark the opening of a new window, and a factory function to provide a closing stream given the event that triggered a window opening.

Using our Timer streams, we know there will be exactly one window created at the start time, and closed at the end time.

We flatten the stream of streams using Observable.Merge:

var filtered = xs.Window(start, _ => end).Merge();

If we subscribe to this like so:

filtered.Subscribe(Console.WriteLine);

We get the following output, as expected:

4
5
6

Again, there are many, many ways to solve this problem, and not just using Window. You can also easily extend this solution to support multiple windows for example (by using an an open times stream and closing times stream factory).