Observable: switch if less than X items emitted

72 views Asked by At

I have a list of observables obs1, obs2, obs3,...,

Each of them can emit a number of items (from mongodb database), I am interested only in the first N items. I want to make sure that queries of my observables are executed only if required. In other words, if obs1, for example, produce more than N, the query behind obs2 should not run, etc.

If I use concat: Observable(obs1, obs2, obs3, ...).concat, all of the queries can run in parallel in mongodb

Basically, I am looking for an operation like obs1.switchIfX(obs2).switchIfX(obs3).....

Where X: less than N items are emitted by current observable.

Any idea how I can implement this requirement in rxscala style?

2

There are 2 answers

3
akarnokd On

You could collect the items from the source into a list, check its size in a flatMap operator and switch to another source if the length is not enough:

@Test
public void test() {
    Observable.range(1, 5)
    .compose(switchIfFewer(Observable.range(1, 8), 10))
    .compose(switchIfFewer(Observable.range(1, 15), 10))
    .test()
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}

static <T> ObservableTransformer<T, T> switchIfFewer(Observable<T> other, int n) {
    return o -> {
        return o.toList()
        .flatMapObservable(list -> {
            if (list.size() < n) {
                return other;
            }
            return Observable.fromIterable(list);
        });
    };
}

If you don't want to get more than N items, you can specify o.take(n).toList() instead.

0
Samuel Gruetter On

You could try something like this (untested):

Observable.just(obs1, obs2, obs3).flatten(maxConcurrent=1).take(N)

The maxConcurrent argument ensures that flatten only subscribes to one observable at a time, and once N items have been emitted, take will unsubscribe from the upstream observable, so if at that point, obs2 or obs3 have not yet been subscribed to, they will never be run, as desired.