I am trying to wrap a grpc-web server-streaming client with rxjs.Observable and be able to perform retries if say the server returns an error.
Consider the following code.
// server
foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
if (!call.request?.getMessage()) {
call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
}
for (let i = 0; i <= 2; i++) {
call.write(new FooResponse())
}
call.end()
}
// client
test("should not end on retry", (done) => {
new Observable(obs => {
const call = new FooClient("http://localhost:8080").foo(new FooRequest())
call.on("data", data => obs.next(data))
call.on("error", err => {
console.log("server emitted error")
obs.error(err)
})
call.on("end", () => {
console.log("server emitted end")
obs.complete()
})
})
.pipe(retryWhen(<custom retry policy>))
.subscribe(
_resp => () => {},
_error => {
console.log("source observable error")
done()
},
() => {
console.log("source observable completed(?)")
done()
})
})
// output
server emitted error
server emitted end
source observable completed(?)
The server emits the "end" event after(?) emitting "error", so it seems like I have to remove the "end" handler from the source observable.
What would be an "Rx-y" way to end/complete the stream?
For anyone interested, I ended up removing the "end" event handler and replaced it with "status", if the server returns an
OK
status code (which signals the end of the stream) then the observable is completed.