Using spring boot kafka template to receive messages

1.3k views Asked by At

In the context of Using Kafka with Spring boot I found a lot of examples of producing messages and receiving messages but I'm doing building a test framework to use in my code and I would like to explore more this point from the documentation:

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/template-receive.html

I basically want a simple way to fetch data from a topic in a synchronous way but every time I try to use my KafkaTemplate to receive messages

kafkaTemplate.receive(config.getTopic(), 0, 0)

It gives me this error: enter image description here

Method threw 'java.lang.IllegalArgumentException' exception.

A consumerFactory is required

I really just interested in understanding if there is any good example of how to use this. There is maybe not much value in what I'm trying to do but I really just want to have some tests in my application that I can ensure I'm producing messages into Kafka, in the right topics, etc... and I want to have a simple test methods for this without building a batch consumer just for testing. If this methods of "receive" exist in the KafkaTemplate I should be able to use them but somehow I haven't found a good example on how to do it.

UPDATE: I've seen I probably need to define a Consumer Factory, just guessing if there is a easy way to copy the configs from the producer configs

    ProducerFactory<String, byte[]> producerFactory = kafkaTemplate.getProducerFactory();
    Map<String, Object> props = producerFactory.getConfigurationProperties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-receive");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    kafkaTemplate.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));

I think I'm in the right path but I still can't really build the consumer from the template:

Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at app//org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at app//org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394)
    at app//org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371)
    at app//org.springframework.kafka.core.KafkaTemplate.receive(KafkaTemplate.java:574)
    at app//org.springframework.kafka.core.KafkaOperations.receive(KafkaOperations.java:295)

UPDATE2: my problem was definitely missing the kafkaTemplate.setConsumerFactory Also like mentioned in the comments the second error I got was from copying the producerConfigs into the consumerConfigs (I had an interceptor in my producer configs)

1

There are 1 answers

0
Miguel Costa On

I think after a bit of investigation and also the answer in the comments kafkaTemplate.receive is not really what I should be using. Mainly because it really always consumes one record at the time.

    @Override
    public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
        Properties props = oneOnly();
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
            requested.forEach(tpo -> {
                if (tpo.getOffset() == null || tpo.getOffset() < 0) {
                    throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + tpo);
                }
                ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
                List<ConsumerRecord<K, V>> consumerRecords = records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>());
                if (one != null) {
                    consumerRecords.add(one);
                }
            });
            return new ConsumerRecords<>(records);
        }
    } 

Probably this is more what I'm looking for https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html