I'm using Spring Reactor with Spring Cloud Stream (GCP Pub/Sub Binder) and running into error handling issues. I'm able to reproduce the issue with a very simple example:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
The behavior I expect is to see "Failed to consume message" print, however, that's not what appears to happen. When adding a .log()
call to the chain I see onNext
/onComplete
signals, I would expect to see onError
signals.
My actual code looks something like this:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
I noticed that deep in my service class I was attempting to do error handling on my Reactor publishers. However, the onError
signal wouldn't occur when using Spring Cloud Stream. If I simply invoked my service as such myService.processMessage(msg)
in a unit test and mocked the exception, my reactive chain would propagate error signals correctly.
It seems to be an issue when I hook in to Spring Cloud Stream. I'm wondering if Spring Cloud Function/Stream is doing any global error wrapping?
In my non-trivial code I do notice this error message that may have something to do with why I'm not getting error signals?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
To further my confusion, I am able to get the onError
signal in my reactive chain if I switch my Spring Cloud Stream binding to the non-reactive implementation as so:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
So this is what I've gathered from my own investigations, maybe this might help others. Forewarning, I might not be using the right "Spring Reactor Language" but this is how I ended up solving it...
In
Hoxton.SR5
, anonErrorContinue
was included on the reactive binding that managed the flux subscription. The problem withonErrorContinue
is that it affects upstream operators by applying the BiConsumer function at the operator that failed (if supported).This means that when an error occurred in our
map
/flatMap
operators, theonErrorContinue
BiConsumer would kick in and modify the downstream signal to eitheronComplete()
(Mono<T>
) orrequest(...)
(if it requested a new element from aFlux<T>
). This resulted in ourdoOnError(...)
operators not executing since there were noonError()
signals.Eventually the SCS team decided to remove this error handling wrapper.
Hoxton.SR6
no longer has thisonErrorContinue
. However, this meant that exceptions propagating up to the SCS binding would result in the Flux subscription being severed. Subsequent messages would then have nowhere to be routed since there were no subscribers.This error handling has been passed along to the clients, we add an
onErrorResume
operator to the inner publisher to effectively drop error signals. When an error is encountered within themyService::processMessage
publisher,onErrorResume
will switch publishers to the fallback publisher that was passed in as a parameter and resume from that point in the operator chain. In our case, this fallback publisher simply returnsMono.empty()
which allows us to drop the error signals while still allowing internal error handling mechanisms to operate while also not affecting the outer source publisher.onErrorResume
Example/ExplanationThe above technique can be illustrated with a very simple example.
The
Flux<Integer>
above will output the following:Since an error is encountered at element
2
,onErrorResume
fallback kicks in and the new publisher becomesFlux.just(4, 5, 6)
effectively resuming from the fallback. In our case, we don't want to affect the source publisher (i.e.Flux.just(1, 2, 3)
). We want to just drop the erroneous element (2
) and continue to the next element (3
).We can't simply change
Flux.just(4, 5, 6)
toFlux.empty()
orMono.empty()
as so:This would cause the following to be output:
This is because
onErrorResume
has replaced the upstream publishers with the fallback publisher (i.e.Mono.empty()
) and resumed from that point on.To achieve our desired output of:
We must place the
onErrorResume
operator on the inner publisher of theflatMap
:Now, the
onErrorResume
only effects the inner publisher returned byfunc(i)
. If an error occurs from operators infunc(i)
,onErrorResume
will fallback toMono.empty()
effectively completing theMono<T>
without blowing up. This also still allows error handling operators (e.g.doOnError
) withinfunc(i)
to be applied before the fallback runs. This is because, unlikeonErrorContinue
, it does not affect upstream operators and change the next signal at the location of the error.Final Solution
Reusing the code-snippet in my question, I've upgraded my Spring Cloud version to
Hoxton.SR6
and changed the code to something like this:Note that the
onErrorResume
is on the inner publisher (inside theflatMap
).