"ConfigException: Please specify a key serde or set one" although I've specified it and also set a default one in my Spring Boot + Kafka Stream app

27 views Asked by At

Here is my Java code to implement a simple stream processing logic:

package com.test.processstrings.consumer;

import com.test.processstrings.domain.EmailEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Service
@EnableKafkaStreams
public class KafkaStreamsService {

    @Autowired
    private StreamsBuilder streamsBuilder;

    @Bean
    public KafkaStreams kafkaStreams() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());


        Map<String, Object> serdeProps = new HashMap<>();
        final JsonSerde<EmailEvent> emailEventJsonSerde = new JsonSerde<>();
        emailEventJsonSerde.configure(serdeProps, false);

        KStream<String, EmailEvent> emailStream = streamsBuilder
                .stream("input-topic", Consumed.with(Serdes.String(), emailEventJsonSerde));

        // Count unique emails
        emailStream.groupByKey()
                .count()
                .toStream()
                .to("output-one", Produced.with(Serdes.String(), Serdes.Long()));

        // Count unique domains
        emailStream.groupBy((key, value) -> value.getEmail().split("@")[1])
                .count()
                .toStream()
                .to("output-two", Produced.with(Serdes.String(), Serdes.Long()));

        return new KafkaStreams(streamsBuilder.build(), props);
    }
}

In my configuration I just declare three beans, one for each topic. The topics are correctly created as I can list them from the console (the app starts but then shuts down quickly). The EmailEvent just has the one "email" String field plus setters, getters, constructor and no arguments constructor.

But, I keep getting the error:

stream-client Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG

It may be a silly mistake but I have been unable to fix this in any way! Has anyone encountered this / understood what I am doing wrong?

I tried to comment on the output stream creation, but the error persisted.

0

There are 0 answers