Quarkus Panache Repository call silently fails when called from an EventBus @ConsumeEvent

889 views Asked by At

Quarkus 1.8.3.Final

Making a direct call to a method that accesses a PanacheRepository works as expected, but when calling the same method via the EventBus, the call reaches the method and executes every line until it reaches any repository call and then just silently fails/exits without any indication of what happened.

According to the logs the direct call is executed in the Quarkus Main Thread and the eventbus call is executed in the vert.x-eventloop-thread-2.

Also tried combinations of the below steps with the same result:

  • Wrapped the consumer side of the EventBus to a Mutiny Uni.
  • Made the consumer return a Uni.
  • Made the consumer explicitly blocking=false (default).
  • Trying both io.vertx.core.eventbus.EventBus and io.vertx.mutiny.core.eventbus.EventBus implementations.
  • Placed the consumer to the same and to a different service.
  • With the @Transactional annotation on the called method.
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        // Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
        eventBus.sendAndForget("test_topic", "eventbus call");
        // This  prints the log in listEntities() and then lists all the entities in the repository.
        listEntities("direct call");
    }

    @ConsumeEvent("test_topic")
    public void listEntities(String testMessage) {
        log.info("Printing all entities via: " + testMessage);
        repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
    }

Here is an excerpt from the EventBus part of the log:

2020-10-19 21:24:39,612 INFO  [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )

Update: bugreport created

1

There are 1 answers

0
Péter Veres On BEST ANSWER

It turned out that the EventBus was swallowing the exception about the blocking call and that returning a reactive response like an Uni or even explicitly wrapping the database call into a Uni will still use the same thread, instead of a worker thread as expected.

The solution was to use blocking=true on the event listener. So it does not set the expected behaviour (as I thought) but prepares the event loop for a blocking call..

The working code is:

@Slf4j
@Startup
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        eventBus.sendAndForget("test_topic", "eventbus call");
    }

    @ConsumeEvent(value = "test_topic", blocking = true)
    @Transactional
    public Uni<Void> listEntities(String testMessage) {
        log.info("Printing all entities via: " + testMessage);
        try {
            repository.findAll().forEach(log::info);
            return Uni.createFrom().voidItem();
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }
}