In a Quarkus application, I want to consume a Kafka message and persist its information in the database using an Entity Manager.
This is what I got so far:
@ApplicationScoped
public class ClientEventConsumer {
@Inject ClientRepository repository;
private final BlockingQueue<Message<ClientEvent>> messages = new LinkedBlockingQueue<>();
void startup(@Observes StartupEvent startupEvent) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedRate(() -> {
if (messages.size() > 0) {
try {
Message<ClientEvent> message = messages.take();
ClientEvent clientEvent = message.getPayload();
ClientEntity clientEntity = new ClientEntity();
clientEntity.setId(clientEvent.getId());
clientEntity.setName(clientEvent.getName());
repository.merge(clientEntity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, 1000, 500, TimeUnit.MILLISECONDS);
}
@Incoming("qualificationCheck")
public CompletionStage<Void> consume(Message<ClientEvent> msg) {
messages.add(msg);
return msg.ack();
}
}
But with this approach the message gets acknowledged before the record is actually persisted in the database. Is there a way to only acknowledge the message if the JPA transaction succeeded?