Acknowledge message after JPA transaction succeed while consuming from Kafka

160 views Asked by At

In a Quarkus application, I want to consume a Kafka message and persist its information in the database using an Entity Manager.

This is what I got so far:

@ApplicationScoped
public class ClientEventConsumer {

    @Inject ClientRepository repository;

    private final BlockingQueue<Message<ClientEvent>> messages = new LinkedBlockingQueue<>();

    void startup(@Observes StartupEvent startupEvent) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.scheduleAtFixedRate(() -> {
            if (messages.size() > 0) {
                try {
                    Message<ClientEvent> message = messages.take();
                    ClientEvent clientEvent = message.getPayload();
                    ClientEntity clientEntity = new ClientEntity();
                    clientEntity.setId(clientEvent.getId());
                    clientEntity.setName(clientEvent.getName());
                    repository.merge(clientEntity);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 1000, 500, TimeUnit.MILLISECONDS);
    }

    @Incoming("qualificationCheck")
    public CompletionStage<Void> consume(Message<ClientEvent> msg) {
        messages.add(msg);
        return msg.ack();
    }
}

But with this approach the message gets acknowledged before the record is actually persisted in the database. Is there a way to only acknowledge the message if the JPA transaction succeeded?

0

There are 0 answers