Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?

1.9k views Asked by At

I'm using Spring Reactor with Spring Cloud Stream (GCP Pub/Sub Binder) and running into error handling issues. I'm able to reproduce the issue with a very simple example:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

The behavior I expect is to see "Failed to consume message" print, however, that's not what appears to happen. When adding a .log() call to the chain I see onNext/onComplete signals, I would expect to see onError signals.

My actual code looks something like this:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

I noticed that deep in my service class I was attempting to do error handling on my Reactor publishers. However, the onError signal wouldn't occur when using Spring Cloud Stream. If I simply invoked my service as such myService.processMessage(msg) in a unit test and mocked the exception, my reactive chain would propagate error signals correctly.

It seems to be an issue when I hook in to Spring Cloud Stream. I'm wondering if Spring Cloud Function/Stream is doing any global error wrapping?

In my non-trivial code I do notice this error message that may have something to do with why I'm not getting error signals?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

To further my confusion, I am able to get the onError signal in my reactive chain if I switch my Spring Cloud Stream binding to the non-reactive implementation as so:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();
}
2

There are 2 answers

3
Jon Catanio On BEST ANSWER

So this is what I've gathered from my own investigations, maybe this might help others. Forewarning, I might not be using the right "Spring Reactor Language" but this is how I ended up solving it...

In Hoxton.SR5, an onErrorContinue was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue is that it affects upstream operators by applying the BiConsumer function at the operator that failed (if supported).

This means that when an error occurred in our map/flatMap operators, the onErrorContinue BiConsumer would kick in and modify the downstream signal to either onComplete() (Mono<T>) or request(...) (if it requested a new element from a Flux<T>). This resulted in our doOnError(...) operators not executing since there were no onError() signals.

Eventually the SCS team decided to remove this error handling wrapper. Hoxton.SR6 no longer has this onErrorContinue. However, this meant that exceptions propagating up to the SCS binding would result in the Flux subscription being severed. Subsequent messages would then have nowhere to be routed since there were no subscribers.

This error handling has been passed along to the clients, we add an onErrorResume operator to the inner publisher to effectively drop error signals. When an error is encountered within the myService::processMessage publisher, onErrorResume will switch publishers to the fallback publisher that was passed in as a parameter and resume from that point in the operator chain. In our case, this fallback publisher simply returns Mono.empty() which allows us to drop the error signals while still allowing internal error handling mechanisms to operate while also not affecting the outer source publisher.

onErrorResume Example/Explanation

The above technique can be illustrated with a very simple example.

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

The Flux<Integer> above will output the following:

Element: 1
Element: 4
Element: 5
Element: 6

Since an error is encountered at element 2, onErrorResume fallback kicks in and the new publisher becomes Flux.just(4, 5, 6) effectively resuming from the fallback. In our case, we don't want to affect the source publisher (i.e. Flux.just(1, 2, 3)). We want to just drop the erroneous element (2) and continue to the next element (3).

We can't simply change Flux.just(4, 5, 6) to Flux.empty() or Mono.empty() as so:

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

This would cause the following to be output:

Element: 1

This is because onErrorResume has replaced the upstream publishers with the fallback publisher (i.e. Mono.empty()) and resumed from that point on.

To achieve our desired output of:

Element: 1
Element: 3

We must place the onErrorResume operator on the inner publisher of the flatMap:

public Mono<Integer> func(int i) {
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Now, the onErrorResume only effects the inner publisher returned by func(i). If an error occurs from operators in func(i), onErrorResume will fallback to Mono.empty() effectively completing the Mono<T> without blowing up. This also still allows error handling operators (e.g. doOnError) within func(i) to be applied before the fallback runs. This is because, unlike onErrorContinue, it does not affect upstream operators and change the next signal at the location of the error.

Final Solution

Reusing the code-snippet in my question, I've upgraded my Spring Cloud version to Hoxton.SR6 and changed the code to something like this:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();
}

Note that the onErrorResume is on the inner publisher (inside the flatMap).

1
toolkit On

I think the problem exists in the following code:

    .map(msg -> new RuntimeException("exception encountered!"))

The lambda in your map line is returning an exception, not throwing one.