I have an application that consumes cloudevents, filter the event based on the header type, transform the event and write it back to the topic.
I am not able to figure out the configurations to get to consume the cloud events from the topic. This is what I have so far:
final Properties streamsConfiguration = new Properties();
streamsConfiguration.putAll(kafkaProperties.buildStreamsProperties());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CloudEvent.class);
KStream<String,CloudEvent> eventStream = builder.stream(inputTopic);
eventStream.foreach((key, value) -> {
log.info("Key: " + key + ", Value" + value);
});
I get the following exception.
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde interface io.cloudevents.CloudEvent
at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:1808) ~[kafka-streams-3.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.valueSerde(AbstractProcessorContext.java:100) ~[kafka-streams-3.5.1.jar:na]
Can someone help in figuring out the configuration for the value Serde?
Update: Based on the comment I have added a custom Serde implementation as below
public class CloudEventSerde implements Serde<CloudEvent> {
@Override
public Serializer<CloudEvent> serializer() {
return new CloudEventSerializer();
}
@Override
public Deserializer<CloudEvent> deserializer() {
return new CloudEventDeserializer();
}
private static class CloudEventSerializer implements Serializer<CloudEvent>{
private final ObjectMapper objectMapper = new ObjectMapper();
@SneakyThrows
@Override
public byte[] serialize(String topic, CloudEvent data){
try {
return objectMapper.writeValueAsBytes(data);
} catch (IOException e){
}
return null;
}
}
private static class CloudEventDeserializer implements Deserializer<CloudEvent>{
private final ObjectMapper objectMapper = new ObjectMapper();
private static final Charset CHARSET = Charset.forName("UTF-8");
@Override
public CloudEvent deserialize(String topic, byte[] data){
String cloudEvent = new String(data, CHARSET);
try {
EventFormat format = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE);
CloudEvent cloudEvent1 = format.deserialize(data);
return cloudEvent1;
} catch (Exception e){
log.error("Error occurred while deserializing data", e);
}
return null;
}
}
public class testService {
public Topology buildTopology(){
final Properties streamsConfiguration = new Properties();
streamsConfiguration.putAll(kafkaProperties.buildStreamsProperties());
log.info("input topic: " + inputTopic);
KStream<String,CloudEvent> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(), new CloudEventSerde()));
stream.foreach((key, value) -> {
log.info("Key: " + key + ", Value" + value);
});
}
}
With this change, I get the error that the mandatory attribute specversion is missing while deserializing data to cloudEvents. The headers seems to have the attribute, I believe the data parameter contains only the data attribute and not the header values. How can I get both the headers and data values?
I got this working with KafkaTemplate instead of KafkaStreams. So I am thinking of going that route. But if someone has suggestions to get this working with KafkaStreams I would really appreciate it.