I am trying to handle the exception thrown from the Rabbit Listener and expose the error to the Rest endpoint.
Rabbit Listener
@Queue(ProductTopicConstants.GET_PRODUCT)
public ProductViewModel find(String id) {
try {
LOG.info(String.format("Listener --> Getting product for specified id = %s", id));
ProductSearchCriteria criteria = new ProductSearchCriteria();
criteria.setId(id);
Bson query = QueryBuilder.QueryBuilder(criteria, Bson.class).get(0);
ProductViewModel productViewModel = Single.fromPublisher(
repository.getCollection(ProductConstrants.PRODUCT_COLLECTION_NAME, Product.class)
.find(query)).map(successValue -> {
return new ProductViewModel(
successValue.getId().toString(),
successValue.getName(),
successValue.getDescription(),
successValue.getPrice()
);
}).blockingGet();
return productViewModel;
} catch (Exception ex) {
throw ex;
}
}
From the listener I have thrown an exception and it is handle in the RabbitListenerExceptionHandler
as shown below
@Singleton
@Primary
@Replaces(DefaultRabbitListenerExceptionHandler.class)
public class RabbitListenerExceptionHandler implements io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(RabbitListenerExceptionHandler.class);
@Override
public void handle(RabbitListenerException exception) {
if (LOG.isErrorEnabled()) {
Optional<RabbitConsumerState> messageState = exception.getMessageState();
if (messageState.isPresent()) {
//LOG.error("Error processing a message for RabbitMQ consumer [" + exception.getListener() + "]", exception);
throw new GlobalException();
} else {
LOG.error("RabbitMQ consumer [" + exception.getListener() + "] produced an error", exception);
}
}
}
}
Now I have a Global exception handler where it handles HTTP Request and Response.
public class GlobalException extends RuntimeException{
}
@Produces
@Singleton
@Requires(classes = {GlobalException.class, ExceptionHandler.class})
public class GlobalExceptionHandler implements ExceptionHandler<GlobalException, HttpResponse> {
@Override
public HttpResponse handle(HttpRequest request, GlobalException exception) {
return HttpResponse.ok(0);
}
}
When I have throw new GlobalException();
from RabbitListenerExceptionHandler
the exception is not watched in the GlobalExceptionHandler
I get an exception as
fete.bird.common.extension.GlobalException: null
at fete.bird.common.extension.RabbitListenerExceptionHandler.handle(RabbitListenerExceptionHandler.java:25)
at fete.bird.common.extension.RabbitListenerExceptionHandler.handle(RabbitListenerExceptionHandler.java:14)
at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice.handleException(RabbitMQConsumerAdvice.java:343)
at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice.access$600(RabbitMQConsumerAdvice.java:67)
at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice$1.doHandleDelivery(RabbitMQConsumerAdvice.java:255)
at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice$1.handleDelivery(RabbitMQConsumerAdvice.java:284)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedRunnable.run(InvocationInstrumenterWrappedRunnable.java:47)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
08:41:35.266 [RxComputationThreadPool-3] ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: The source did not signal an event for 10000000000 nanoseconds and has been terminated.
java.util.concurrent.TimeoutException: The source did not signal an event for 10000000000 nanoseconds and has been terminated.
at io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutSubscriber.onTimeout(FlowableTimeoutTimed.java:139)
at io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutTask.run(FlowableTimeoutTimed.java:170)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Exception handlers used for HTTP have no relationship to the ones used for handling messaging exceptions and they are not designed to be called as a result of exceptions thrown from listeners or listener exception handlers.