Why flatMap is implemented with merge in RxJava?

2.1k views Asked by At

Why is RxJava 1.x flatMap() operator is implemented with merge?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

From flatMap() call I can return only one Observable that conforms to <? extends Observable<? extends R>>. Than map(func) call wraps it into another Observable, so that we have something like this Observable<? extends Observable<? extends R>>. That makes me thinking that merge() call after map(func) is unnecessary.

The merge() operator is said to perform the following:

Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.

Now inside the flat map we can only have an Observable that emits one Observable. Why merge? What am I missing here?

Thank you.

2

There are 2 answers

0
Simon Baslé On BEST ANSWER

Looking at the signatures might help:

Imagine that with an Observable<String> you want to flatMap to the individual characters. The way of doing that with flatMap is:

Observable.just("foo", "hello")
          .flatMap(s -> Observable.from(s.split("")))

What's the type of this Observable? It is an Observable<String>.

Now instead of using flatMap, use map with the same function. What's the type?

Observable.just("foo", "hello")
          .map(s -> Observable.from(s.split("")))

You'll see that it is actually Observable<Observable<String>>... And if we subscribe to this observable and print out the emitted items, we get this:

rx.Observable@5b275dab
rx.Observable@61832929

Not very useful. Worse, these Observables haven't been subscribed to, so they won't emit any data :(

We see that the goal of flatMap is to let the function produce an inner Observable<T> per source item, and then subscribe to these inner observables and flatten their emission together in the output Observable<T>. And merge does just that!

To verify that, wrap the above map result in an Observable.merge(...):

Observable<Observable<String>> mapped = 
    Observable.just("foo", "hello")
              .map(s -> Observable.from(s.split("")));

Observable.merge(mapped)
          .subscribe(System.out::println);

This outputs:

f
o
o
h
e
l
l
o
0
Tassos Bassoukos On

Now inside the flat map we can only have an Observable that emits one Observable.

No - you have one observable that emits one observable per source observable item. So if your source observable has more items, you are going to have multiple observables emited.

That's why you need merge().