I have two observables that are created from the same source. They are differentiated by a map that assigns a randomized value to a property of the element being emitted. Here is an example of the logic:
var Rx = require('rx');
var _ = require('lodash');
// create a source that emits a single event, and map that to an empty object
var source = Rx.Observable
.range(0, 1)
.map(function makeObject() { return {}; });
// map the empty object and give each one a type property with the
// value randomly chosen between "a" or "b"
var typed = source.map(function type(obj) {
obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
return obj;
});
// create an observable that only contains "a"
var a = typed.filter(function(obj) {
return obj.type === 'a';
});
// create an observable that only contains "b"
var b = typed.filter(function(obj) {
return obj.type === 'b';
});
// merge both observables and log the result in the subscription
Rx.Observable.merge(a, b).subscribe(function(obj) {
console.log(obj);
});
I would expect that this final merged stream will always produce a single object with either obj.type === 'a'
or obj.type === 'b'
, and then complete.
However, every time I run this script I get various results, some expected, some unexpected.
Expected result "a":
{ type : 'a' }
Expected result "b":
{ type : 'b' }
Unexpected both:
{ type : 'a' }
{ type : 'b' }
And, sometimes I get no output at all. What am I missing here?
The issue is related to the lazy nature of RX:
You have two subscriptions which get created by the merge call, each subscription results in evaluation of all the observable operators.
Which means:
subscription a -> could result in either:
subscription b -> same, either:
If you merge those streams, you'll get either of those results: only a, only b, both a&b, neither of them.
More details
Let's look at a simpler example:
Now in a regular pub-sub systems, we would expect that if I'll add 2 subscribers, each subscriber output the same value:
Only they are not, each will print a different value, because each subscription calls Math.Random() again.
And although it's a bit weird, it's actually the correct behaviour of rx observables, each new subscription causes new evaluation of the observable flow.
Merge subscribe to those two observables (which means two values were created instead of one) and emit the values to a new observable.
To avoid this behaviour we can use the publish operators of RX. There's more detailed explanation on those here:
http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
So, in this case: