Retryable grpc-web server-streaming rpc

725 views Asked by At

I am trying to wrap a grpc-web server-streaming client with rxjs.Observable and be able to perform retries if say the server returns an error.

Consider the following code.

  // server
  foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
    if (!call.request?.getMessage()) {
      call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
    }

    for (let i = 0; i <= 2; i++) {
      call.write(new FooResponse())
    }
    call.end()
  }
 
  // client 
  test("should not end on retry", (done) => {
    new Observable(obs => {
      const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
      call.on("data", data => obs.next(data))
      call.on("error", err => {
        console.log("server emitted error")
        obs.error(err)
      })
      call.on("end", () => {
        console.log("server emitted end")
        obs.complete()
      })
    })
    .pipe(retryWhen(<custom retry policy>))
    .subscribe(
        _resp => () => {},
        _error => {
          console.log("source observable error")
          done()
        },
        () => {
          console.log("source observable completed(?)")
          done()
        })
  })
  
  // output
  server emitted error
  server emitted end
  source observable completed(?)

The server emits the "end" event after(?) emitting "error", so it seems like I have to remove the "end" handler from the source observable.

What would be an "Rx-y" way to end/complete the stream?

1

There are 1 answers

0
j4ckofalltrades On BEST ANSWER

For anyone interested, I ended up removing the "end" event handler and replaced it with "status", if the server returns an OK status code (which signals the end of the stream) then the observable is completed.

new Observable(obs => {
  const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
  call.on("data", data => obs.next(data))
  call.on("error", err => obs.error(err))
  call.on("status", status: grpcWeb.Status => {
    if (status.code == grpcWeb.StatusCode.OK) {
      return observer.complete()
    }
  })
})