Consuming and filtering cloud events using Kafka Streams

109 views Asked by At

I have an application that consumes cloudevents, filter the event based on the header type, transform the event and write it back to the topic.

I am not able to figure out the configurations to get to consume the cloud events from the topic. This is what I have so far:

final Properties streamsConfiguration = new Properties();
streamsConfiguration.putAll(kafkaProperties.buildStreamsProperties());
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CloudEvent.class);
    KStream<String,CloudEvent> eventStream = builder.stream(inputTopic);

        eventStream.foreach((key, value) -> {
            log.info("Key: " + key + ", Value" + value);
        });

I get the following exception.

org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde interface io.cloudevents.CloudEvent
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:1808) ~[kafka-streams-3.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.valueSerde(AbstractProcessorContext.java:100) ~[kafka-streams-3.5.1.jar:na]

Can someone help in figuring out the configuration for the value Serde?

Update: Based on the comment I have added a custom Serde implementation as below

public class CloudEventSerde implements Serde<CloudEvent> {

    @Override
    public Serializer<CloudEvent> serializer() {
        return new CloudEventSerializer();
    }

    @Override
    public Deserializer<CloudEvent> deserializer() {
        return new CloudEventDeserializer();
    }

    private static class CloudEventSerializer implements Serializer<CloudEvent>{
    private final ObjectMapper objectMapper = new ObjectMapper();

    @SneakyThrows
    @Override
    public byte[] serialize(String topic, CloudEvent data){
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (IOException e){

        }
     return null;

    }
}

   private static class CloudEventDeserializer implements Deserializer<CloudEvent>{
    private final ObjectMapper objectMapper = new ObjectMapper();
    private static final Charset CHARSET = Charset.forName("UTF-8");
    @Override
    public CloudEvent deserialize(String topic, byte[] data){
        String cloudEvent = new String(data, CHARSET);
        try {
            EventFormat format = EventFormatProvider
                    .getInstance()
                    .resolveFormat(JsonFormat.CONTENT_TYPE);
            CloudEvent cloudEvent1 = format.deserialize(data); 
            return cloudEvent1;
        } catch (Exception e){
            log.error("Error occurred while deserializing data", e);

        }
        return null;
    }
}


public class testService {
    public Topology buildTopology(){
     final Properties streamsConfiguration = new Properties();
    streamsConfiguration.putAll(kafkaProperties.buildStreamsProperties());
    log.info("input topic: " + inputTopic);
      
            KStream<String,CloudEvent> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(), new CloudEventSerde()));
    
            stream.foreach((key, value) -> {
                log.info("Key: " + key + ", Value" + value);
            });
    }
 }

With this change, I get the error that the mandatory attribute specversion is missing while deserializing data to cloudEvents. The headers seems to have the attribute, I believe the data parameter contains only the data attribute and not the header values. How can I get both the headers and data values?

I got this working with KafkaTemplate instead of KafkaStreams. So I am thinking of going that route. But if someone has suggestions to get this working with KafkaStreams I would really appreciate it.

0

There are 0 answers