Overriding KStreams default serializer (ByteArraySerializer)

575 views Asked by At

I can't seem to override the serializer of a topic to Serdes.String(). I'm trying a simple use case of reading from a topic (stream), and writing to a KTable. What I have so far:

@Component
class Processor {
    @Autowired
    public void process(final StreamsBuilder builder) {
        final Serde<String> stringSerde = Serdes.String();
        builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
                .filter((key, value) -> value.contains("ACTION"))
                .toTable(Materialized.as("output_table_materialized"))
                .toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line

    }
}

The exception I get is:

org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

From what I gather, it understands the message is a String but it's using the default deserializer ByteArraySerializer. Where am I going wrong in the above code?

2

There are 2 answers

4
OneCricketeer On

The Consumed.with would be a Deserializer.

The error is on the Serializer, or the toTable call, which you may add Produced.with or modify your application properties to configure the defaults there

0
coder34 On

I faced a similar issue and the solution was to specify the serdes on the Materialized instance, i.e. swapping

.toTable(Materialized.as("output_table_materialized"))

with

.toTable(Materialized.as("output_table_materialized").withKeySerde(stringSerde).withValueSerde(stringSerde))