Java Flux Exception Handling?

1.1k views Asked by At

Can someone please guide me as I am new to the flux and trying to understand how to handle this scenario?

Issue: I am getting readTimeout exception in one of the flux responses from getResp() method below and then all the prior successful responses are ignored and the exception error is returned.

Instead, I want to return all the successful responses I received prior to the exception.

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
return requestObjList.stream()
       .parallel()
       .map(this::getResp)
       .reduce(Flux::Merge)
       .orElse(Flux.empty());
}

public Flux<CustomObject1> getResp(RequestObj requestObj){
// process the request and return ...        

}

Please let me know if this is not clear, happy to provide more details.

1

There are 1 answers

2
Alex On

There are several ways to handle errors in flatMap

Use flatMapDelayError that will delay any error until all elements are processed

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
    return Flux.fromIterable(requestObjList)
            .flatMapDelayError(this::getResp, Queues.SMALL_BUFFER_SIZE, Queues.XS_BUFFER_SIZE)
            .onErrorResume(e -> {
                // log error
                return Mono.empty();
            });
}

Handle error for every element

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
    return Flux.fromIterable(requestObjList)
            .flatMap(request -> 
                    getResp(request)
                            .onErrorResume(e -> {
                                // log error
                                return Mono.empty();
                            })
            );