I can't seem to override the serializer of a topic to Serdes.String()
. I'm trying a simple use case of reading from a topic (stream), and writing to a KTable. What I have so far:
@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}
The exception I get is:
org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
From what I gather, it understands the message is a String
but it's using the default deserializer ByteArraySerializer
. Where am I going wrong in the above code?
The Consumed.with would be a Deserializer.
The error is on the Serializer, or the toTable call, which you may add
Produced.with
or modify your application properties to configure the defaults there