I'm working with a java application that use Quarkus and Kafka.
I have a Kafka Consumer that reads from a topic but I need to read some data form a database before it start to consume.
My consumer looks like this:
@ApplicationScoped
public class Consumer {
private final Logger LOG = Logger.getLogger(Consumer.class);
@Inject
ObjectMapper objectMapper;
@Incoming("topic-in")
public void consume(ConsumerRecord<String, String> record) throws JsonProcessingException {
LOG.info(record.key());
Data myData = objectMapper.readValue(record.value(), Data.class);
myData.setKey(record.key());
// Do stuff...
}
}
I know that i can use @KafkaListener
as describe here (Run a method before starting spring kafka-consumer) but I'd like to keep using Quarkus and the @Incoming
annotation.
I also tried to fetch the data from the database using something like that void onStartup(@Observes StartupEvent startupEvent)
but without luck
Based on what I've read, SmallRye docs mention "unmanaged streams", which sounds like something that you want, but I see no clear documentation for what this means.
Alternatively, you would need to use
KafkaClientService
, and access the consumer directly; you cannot use@Incoming
, as this will always start consuming data immediately.https://quarkus.io/guides/kafka#kafka-bare-clients
Other option would be to use Kafka Connect / Debezium to read the data into its own Kafka topic, then use Kafka Streams to build whatever database view that you would have otherwise queried for at startup.