Merge implemented as flatMap

1.2k views Asked by At

Theoretically it should be possible to implement any RxJS operator (except just() and flatMap()) through flatMap(). For instance map() can be implemented as

function map(source, selector) {
  return source.flatMap(x => Rx.Observable.just(selector(x)));
}

How to implement merge() through flatMap()? (avoiding mergeAll() too, of course)

1

There are 1 answers

0
paulpdaniels On BEST ANSWER

It looks possible if you take advantage of the fact that flatMap can also take array return values.

Rx.Observable.prototype.merge = function(other) {
  var source = this;
  return Rx.Observable.just([source, other])
           //Flattens the array into observable of observables
           .flatMap(function(arr) { return arr; })
           //Flatten out the observables
           .flatMap(function(x) { return x; });
}

EDIT 1

Using RxJS 6 and the pipe syntax

import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'

function merge (other) {
  return source => of([source, other]).pipe(
           //Flattens the array into observable of observables
           flatMap(arr => arr)
           //Flatten out the observables
           flatMap(x => x)
         );
}

const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;

const source1 = interval(2000).pipe(
  timestamp(),
  map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)

const source2 = interval(3000).pipe(
  timestamp(),
  map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)

function mergeFromFlatMap (other) {
  return source => just([source, other]).pipe(
    flatMap(arr => arr),
    flatMap(seq => seq)
  )
}

source1.pipe(
  mergeFromFlatMap(source2),
  take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>