HTTP Longpolling with changing request parameters using rxjs

40 views Asked by At

I need to implement HTTP Longpolling using rxjs (in an Angular environment). The challenge is that I need to change the request parameters on every call. To be more specific: I need to change the from/to values to have it rolling. For example always go back 1 minute in time from the current time.

Right now I have the following implementation:

const requestParameter$ = new BehaviorSubject<MetricAggregationParams>(initialRequestParameter);

this.metricsService
  .searchAggregatedMetrics(requestParameter$)
  .pipe(
    tap((metricInstancesResult) => {
      // do something with the result
    }),
    delay(3000),
    tap(() => {
      requestParameter$.next({
        ...requestParameter$.value,
        from: DateTime.now()
          .minus({ minutes: timerange.timerangeInMinutes as number })
          .toISO(),
        to: DateTime.now().toISO()
      });
    })
  )
  .subscribe();
})

searchAggregatedMetrics(requestParameter$: BehaviorSubject<MetricAggregationParamsDto>) {
  return requestParameter$.pipe(
    concatMap((requestParameter) =>
      this.http.post<MetricAggregationResult>(`REST_URL`, requestParameter)
    )
  );
}

Here are some constraints:

  • The first request should start right away (no delay)
  • The next request should start after the previous request has finished + 3000ms

Is there a way to have the longpolling logic all together in the searchAggregatedMetrics method?

1

There are 1 answers

0
Picci On

If I understand the problem right, you are facing some kind of recursive problem. Recursion in rxJs is addressed with the expand operator.

One solution to your problem could be along these lines

restService(0).pipe(
  delay(300),
  map(resp => {
    console.log("do stuff with resp " + resp)
  }),
  map(() => {
    console.log("prepare next input " + counter++)
    return counter
  }),
  // expand is an operator that takes as input a function that
  // returns an Observable
  expand(input => {
    return restService(counter).pipe(
      delay(300),
      map(resp => {
        console.log("do stuff with resp " + resp)
      }),
      map(() => {
        console.log("prepare next input " + counter++)
        return counter
      }),
    )
  }),
  // we use take just to finish the iteration after a certain number of calls
  take(10)
).subscribe()

// counter used to simulate the fact that we change input at every call
let counter = 0
// function to simulate a rest service call
function restService(input: number) {
  return of(input).pipe(
    tap(input => console.log("Input received " + input)),
    map(input => {
      const resp = "response " + input
      return resp
    })
  )
}

There is clearly a repetition in the above code, which can therefore coded a bit more elegantly (but maybe less clearly) like this

function restServiceOnSteroids(input: number) {
  return restService(input).pipe(
    delay(300),
    map(resp => {
      console.log("do stuff with resp " + resp)
    }),
    map(() => {
      console.log("prepare next input " + counter++)
      return counter
    })
  )
}

restServiceOnSteroids(counter).pipe(
  expand(input => restServiceOnSteroids(input)),
  take(7)
).subscribe()

Here a stackblitz that reproduces this solution