I'm trying to use a Flux to process records from a database & pass them to a Kinesis stream - this works really well, but can result in unbounded memory usage if my Kinesis clients get "Backed Up"
I've changed the backpressure to FluxSink.OverflowStrategy.ERROR, which errors when there's not enough demand for the pressure, but this appears to leave me no option to pause the database reads, and attempt to resume after a delay.
What I've got, is something like the following:
Flux.<Map<String, List<Object>>>create((emitter) -> {
/// Boring Database Stuff
while (resultSet.next()) {
emitter.next(valueFromDatabase)
}
}, FluxSink.OverflowStrategy.ERROR)
.subscribeOn(scheduler)
.handle((message, synchronousSink) -> {
// Send Stuff To Kinesis
}
.subscribe();
I had assumed I'd be able to try/catch around the emitter.next() to handle the backpressure error, but it's not caught there. Instead, it just terminates the subscription.
I've tried .onErrorContinue((throwable, o) -> logger.error("Continuing From Error", throwable)) but that doesn't even seem to get called.
I'd like to be able to to just "pause" the read from the database until the pressure subsides, and then continue on with the next record, with some kind of backoff if I keep getting those errors. Is there any way to achieve this?
I ended up creating my own sink for this purpose:
With the sink, I'm able to handle the
FAIL_OVERFLOWerrors manually. In this case,emitNextwill just retry if the 2nd parameter returnsTRUEAlternately, you can use the
sink.tryEmitNext()method, which will return the result (possibly aFAIL_OVERFLOW) which you can then handle however you'd like.Then, to connect it to spring-cloud-stream, I can simply return the sink as a flux:
I'm sure there are downsides to this, but in my relatively simple application, this was sufficient.