Why is RxJava 1.x flatMap() operator is implemented with merge?
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
From flatMap() call I can return only one Observable that conforms to <? extends Observable<? extends R>>
. Than map(func) call wraps it into another Observable, so that we have something like this Observable<? extends Observable<? extends R>>
. That makes me thinking that merge() call after map(func) is unnecessary.
The merge() operator is said to perform the following:
Flattens an Observable that emits Observables into a single Observable that emits the items emitted by those Observables, without any transformation.
Now inside the flat map we can only have an Observable that emits one Observable. Why merge? What am I missing here?
Thank you.
Looking at the signatures might help:
Imagine that with an
Observable<String>
you want toflatMap
to the individual characters. The way of doing that with flatMap is:What's the type of this Observable? It is an
Observable<String>
.Now instead of using
flatMap
, usemap
with the same function. What's the type?You'll see that it is actually
Observable<Observable<String>>
... And if we subscribe to this observable and print out the emitted items, we get this:Not very useful. Worse, these
Observable
s haven't been subscribed to, so they won't emit any data :(We see that the goal of
flatMap
is to let the function produce an innerObservable<T>
per source item, and then subscribe to these inner observables and flatten their emission together in the outputObservable<T>
. Andmerge
does just that!To verify that, wrap the above map result in an
Observable.merge(...)
:This outputs: