Why are these RxJS observables producing strange outputs?

510 views Asked by At

I have two observables that are created from the same source. They are differentiated by a map that assigns a randomized value to a property of the element being emitted. Here is an example of the logic:

var Rx = require('rx');
var _ = require('lodash');

// create a source that emits a single event, and map that to an empty object
var source = Rx.Observable
    .range(0, 1)
    .map(function makeObject() { return {}; });

// map the empty object and give each one a type property with the
// value randomly chosen between "a" or "b"
var typed = source.map(function type(obj) {
    obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
    return obj;
});

// create an observable that only contains "a"
var a = typed.filter(function(obj) {
    return obj.type === 'a';
});

// create an observable that only contains "b"
var b = typed.filter(function(obj) {
    return obj.type === 'b';
});

// merge both observables and log the result in the subscription
Rx.Observable.merge(a, b).subscribe(function(obj) {
    console.log(obj);
});

I would expect that this final merged stream will always produce a single object with either obj.type === 'a' or obj.type === 'b', and then complete.

However, every time I run this script I get various results, some expected, some unexpected.

Expected result "a":

{ type : 'a' }

Expected result "b":

{ type : 'b' }

Unexpected both:

{ type : 'a' }
{ type : 'b' }

And, sometimes I get no output at all. What am I missing here?

1

There are 1 answers

6
Yshayy On BEST ANSWER

The issue is related to the lazy nature of RX:

You have two subscriptions which get created by the merge call, each subscription results in evaluation of all the observable operators.

Which means:

subscription a -> could result in either:

  • Item a is generated and then emitted.
  • Item b is generated and then filtered out

subscription b -> same, either:

  • Item b is generated and then emitted
  • Item a is generated and then filtered out

If you merge those streams, you'll get either of those results: only a, only b, both a&b, neither of them.

More details

Let's look at a simpler example:

 var source = Rx.Observable
    .range(0, 1)
    .map(function () { return Math.random(); })

Now in a regular pub-sub systems, we would expect that if I'll add 2 subscribers, each subscriber output the same value:

source.subscribe(function(x){console.log("sub 1:" + x)})
source.subscribe(function(x){console.log("sub 2:" + x)})

Only they are not, each will print a different value, because each subscription calls Math.Random() again.

And although it's a bit weird, it's actually the correct behaviour of rx observables, each new subscription causes new evaluation of the observable flow.

Merge subscribe to those two observables (which means two values were created instead of one) and emit the values to a new observable.

To avoid this behaviour we can use the publish operators of RX. There's more detailed explanation on those here:

http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html

So, in this case:

var source = Rx.Observable
    .range(0, 1)
    .map(function makeObject() { return {}; });

var typed = source.map(function type(obj) {
    obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
    return obj;
}).replay().refCount();