Quarkus Kafka Consumer: read from database before starting consuming

778 views Asked by At

I'm working with a java application that use Quarkus and Kafka.

I have a Kafka Consumer that reads from a topic but I need to read some data form a database before it start to consume.

My consumer looks like this:


@ApplicationScoped
public class Consumer {

    private final Logger LOG = Logger.getLogger(Consumer.class);
    
    @Inject
    ObjectMapper objectMapper;
   
    @Incoming("topic-in")
    public void consume(ConsumerRecord<String, String> record) throws JsonProcessingException {
        LOG.info(record.key());

        Data myData = objectMapper.readValue(record.value(), Data.class);
        myData.setKey(record.key());

        // Do stuff...
    }
}

I know that i can use @KafkaListener as describe here (Run a method before starting spring kafka-consumer) but I'd like to keep using Quarkus and the @Incoming annotation.

I also tried to fetch the data from the database using something like that void onStartup(@Observes StartupEvent startupEvent) but without luck

2

There are 2 answers

2
OneCricketeer On

Based on what I've read, SmallRye docs mention "unmanaged streams", which sounds like something that you want, but I see no clear documentation for what this means.

Alternatively, you would need to use KafkaClientService, and access the consumer directly; you cannot use @Incoming, as this will always start consuming data immediately.

https://quarkus.io/guides/kafka#kafka-bare-clients

Other option would be to use Kafka Connect / Debezium to read the data into its own Kafka topic, then use Kafka Streams to build whatever database view that you would have otherwise queried for at startup.

0
Kaiak On

I was able to get my initial solution to work.

This is how I set up the class responsible for loading the data using the @Observe annotation:

@ApplicationScoped
public class DataLoader {

    @Inject
    EntityManager entityManager;

    protected void onStartup(@Observes StartupEvent startupEvent) {
        List<Data> result = entityManager.createNamedQuery("selectAll", Data.class).getResultList();
        // Do stuff...
    }
}

The Consumer is the same in my original question.