How to compose an observable with a task that might need to be re-run?

316 views Asked by At

Suppose I have an async method that runs a query and returns a set of results:

Task<IEnumerable<Foo>> FetchResultSet();

And I have an observable that fires whenever the result set needs to be fetched again:

IObservable<Unit> NeedToRefetch;

What I want is:

IObservable<IEnumerable<Foo>>

Which would (1) run the query and yield an initial result set, and (2) each time NeedToRefetch fires, run the query again and yield another result set.

What's the best way to compose this observable?

If I didn't need that initial result set, I could do this:

NeedToRefetch
    .Select(_ => Observable.FromAsync(() => FetchResultSet()))
    .Concat();

So to make sure that the query gets run at least once, I could do this:

Observable.Return(Unit.Default)
    .Merge(NeedToRefetch)
    .Select(_ => Observable.FromAsync(() => FetchResultSet()))
    .Concat();

But then I started reading about cold and hot observables and I wondered if instead I should do something like this:

var initial = Observable.FromAsync(() => FetchResultSet());
var later = NeedToRefetch
    .Select(_ => Observable.FromAsync(() => FetchResultSet()))
    .Concat();
initial
    .Merge(later);

And then I wondered if this is a case where I am supposed to use Observable.Create.

And I then I stopped wondering and wrote this question.

1

There are 1 answers

0
Jon G Stødle On

I'd say you're almost there. You can use your original approach. To make it run once immediately you can include a StartWith():

NeedToRefetch
    .StartWith(Unit.Default)
    .Select(_ => Observable.FromAsync(() => FetchResultSet()))
    .Concat();