Queue overflow after subscriber cancelling subscription

37 views Asked by At

I am new to ReactiveX and debugging a functional test issue. In the test, I am sending event streams to a WebSocket client, and it works perfect with less than 4 events, but starting with 5 events it constantly fails.

The event streams are sent synchronously to the WebSocketClient from the integration test-

inputEvents.stream().forEach(event -> {
    webSocketClient.sentMessage(event);
});

In the server side logs, I can see some statements about processing the first event -

Received event #1
Subscriber canceled subscription
Subscriber canceled subscription

followed by this error

io.reactivex.rxjava3.exceptions.QueueOverflowException: Queue overflow due to illegal concurrent onNext calls or a bug in an operator.

The server side code looks like

var publisher = Flowable.fromPublisher(inputstream.getPublisher())
    .onBackpressure()
    .map(event -> {
      synchronized (event) {
        log.debug("Received event {}", eventNum);
        return someTransformation(event)
      }
    })
    .doOnError(...)
    .doOnTerminate(...)
    .doOnCancel(...)
    .doOnComplete(...)
    .onBackpressureBuffer()
    .observeOn(Schedulers.computation());

ChatGPT said the subscription cancellation could cause the queue overflow error and suggest looking into the cancellation. It said the subscriber could proactively cancel the subscription when there is a risk of out of memory.

Therefore, I tried replacing the first onBackpressure() with .onBackpressureLatest() or .onBackpressureDrop() but it didn't help. Also, I tried reducing the event content size to minimum (a few bytes) and it didn't help either. Only the number of events seems to be relevant here.

Any thoughts here?

0

There are 0 answers