Reactive pattern for combing operator where one is dependent on another

221 views Asked by At

I have an app that consumes a REST service. It makes requests every time someone e.g. clicks a button. However it needs to get a token first and then refresh every 20 minutes thereafter. Here is a marble diagram that represents the desired combining of the streams ...

TOKEN SOURCE   ---------A--------------B--------------C----------------D-
REQUEST SOURCE ----1-----------2--3---------4-----------5-------6--------


RESULT SOURCE  ---------A1-----A2-A3--------B4----------C5------C6-------

Typically the request source triggers the result which combines with the latest value of the token source, however there is an exception when no items have been emitted on the token stream - in this case the request is buffered until the first token arrives and then it is sent.

The combineLatest operator is almost there but it triggers when either stream emits. The marble diagram for sample operator also seems close but it throttles the output based on a time interval which is not what I want.

Which operator/chain of operators would work for this instance?

I'm prototyping with RxJS but I need to implement in RxSwift.

2

There are 2 answers

1
Asti On BEST ANSWER

You were close to the solution. The important detail is that Source (2) projects all its elements, while Source (1) does not directly project, and having that extra bit of information can help us discard when only Source (1) changes.

s1.combineLatest(s2, (a, b) => [a, b])
  .distinctUntilChanged(v => v[1])

Here's the test setup:

var s1 = Rx.Observable.interval(997).take(10).map(i => String.fromCharCode(65 + i))
var s2 = Rx.Observable.interval(251).take(10).delay(500).skip(1)

s1.combineLatest(s2, (a, b) => [a, b])
  .distinctUntilChanged(v => v[1])
  .subscribe(value => console.log(value))

Output:

["A",1] ["A",2] ["A",3] ["A",4] ["B",5] ["B",6] ["B",7] ["B",8] ["C",9]

1
Olaf Horstmann On

Since you want your data-emission only to be triggered based on your REQUEST SOURCE, it'd probably be the best way to start your stream with that and then switch - or in this case flatMap - onto the TOKEN SOURCE and combine those two - no advanced operators required:

(Note: My assumption is, that your TOKEN SOURCE does replay the latest token:

var token$ = Rx.Observable.interval(1000).take(10).map(() => Math.random()).publishReplay(1); // simulating the token source: new token every second and replaying the latest token
var requests$ = Rx.Observable.interval(250).take(10); // simulating the request source: emitting a new request every 250ms
token$.connect();

requests$
    .flatMap(request => token$
        .take(1)
        .map(token => ({request, token}))
    )
    .subscribe(function(requestWithToken) {
        console.log("Matched: ", requestWithToken);
    });

You can check out the live fiddle here: https://jsfiddle.net/rn8rzufg/