RxJS combineLatest emit only once per path

430 views Asked by At

Is there any way/pattern to use combineLatest() or some other operator so that if the combined observables depend on one another, they will only emit only once for each set of paths with the same origin in a DAG? I think it may be easier to explain with a diagram.

Diagram:

   A C
  /| |
 B | |
  \|/
   D

Here B is subscribed to A, and D is subscribed to A, B, and C. The default behavior is that if A emits, D emits twice: once when A emits, and once more when B emits (as a result of A emitting). I would like it to emit only once, after both have emitted. However, if C emits, then D should emit immediately as well.

Here's the code:

const A = new Rx.BehaviorSubject(1);
const B = A.pipe(Rx.map((x) => x + 1));
const C = new Rx.BehaviorSubject(3);

const D = Rx.combineLatest({ A, B, C });

D.subscribe(console.log); // {A: 1, B: 2, C: 3}
A.next(2); // Two emissions: {A: 2, B: 2, C: 3}, {A: 2, B: 3, C: 3} 
// Would like the second one ONLY, i.e. {A: 2, B: 3, C: 3}
C.next(4); // Correctly emits: {A: 2, B: 3, C: 4}

One solution I've tried is to linearize it and make the entire collection of A, B, and C an observable:

 {A, C}
    |
{A, C, B}
    |
   {D}

This works but I wonder if there's perhaps a better way.

2

There are 2 answers

2
Picci On

If I understand right the question, you want that the final Observable D emits any time either A (the upstream of B) or C emit but, if A emits, you want also the value emitted by B.

If this is the case, I would make sure that any time B emits, it emits not only the value of B but also the value notified by its upstream A, and then I would pass to combineLatest only B and C, like this

const A = new BehaviorSubject(1);
const B = A.pipe(map((x) => [x, x + 1]));
const C = new BehaviorSubject(3);

const D = combineLatest({ B, C }).pipe(
  map((val) => ({ A: val.B[0], B: val.B[1], C: val.C }))
);
0
Mark van Straten On

you can zip A,B, together so for every pair of emissions you get only one in D. Combine that with C which is just one:

const A = new BehaviorSubject(1);
const B = A.pipe(map((x) => [x, x + 1]));
const C = new BehaviorSubject(3);


const D = combineLatest({ zip(A,B), C })

This requires both A & B to emit a value but because those depend on eachother that is not an issue in your example.

https://www.learnrxjs.io/learn-rxjs/operators/combination/zip