I am new to ReactiveX and debugging a functional test issue. In the test, I am sending event streams to a WebSocket client, and it works perfect with less than 4 events, but starting with 5 events it constantly fails.
The event streams are sent synchronously to the WebSocketClient from the integration test-
inputEvents.stream().forEach(event -> {
webSocketClient.sentMessage(event);
});
In the server side logs, I can see some statements about processing the first event -
Received event #1
Subscriber canceled subscription
Subscriber canceled subscription
followed by this error
io.reactivex.rxjava3.exceptions.QueueOverflowException: Queue overflow due to illegal concurrent onNext calls or a bug in an operator.
The server side code looks like
var publisher = Flowable.fromPublisher(inputstream.getPublisher())
.onBackpressure()
.map(event -> {
synchronized (event) {
log.debug("Received event {}", eventNum);
return someTransformation(event)
}
})
.doOnError(...)
.doOnTerminate(...)
.doOnCancel(...)
.doOnComplete(...)
.onBackpressureBuffer()
.observeOn(Schedulers.computation());
ChatGPT said the subscription cancellation could cause the queue overflow error and suggest looking into the cancellation. It said the subscriber could proactively cancel the subscription when there is a risk of out of memory.
Therefore, I tried replacing the first onBackpressure() with .onBackpressureLatest() or .onBackpressureDrop() but it didn't help. Also, I tried reducing the event content size to minimum (a few bytes) and it didn't help either. Only the number of events seems to be relevant here.
Any thoughts here?