I'm using Kafka Streams to read from a topic in my cluster, and I want to filter messages based on its JSON contents, i.e.:

JSON Format:

{
   "id": 1 
   "timestamp": "2019-04-21 12:53:18", 
   "priority": "Medium", 
   "name": "Sample Text",
   "metadata": [{
      "metric_1": "0", 
      "metric_2": "1", 
      "metric_3": "2"
   }]
}

I want to read the messages from an input topic (let's call it "input-topic"), filter them (let's assume I only want messages which priority is "Low"), then aggregate these, and send them to another topic ("filtered-topic")

I don't have that much code other than creating the stream itself and its configurations. I'm thinking there must be something about the Serdes that I need to configure but I'm not sure how. I also tried using JSON deserializer but I couldn't get it to work.

First of all, is this even possible? If so, what would be the correct course of action?

1 Answers

1
sendon1982 On

You can build stream from topic.

    StreamsBuilder builder = new StreamsBuilder();

    // key value type here is both String for me and update based on cases
    KStream<String, String> source = builder.stream("input-topic");

    source.filter(new Predicate<String, String>() {
        @Override
        public boolean test(String s, String s2) {
            // your filter logic here and s and s2 are key/value from topic
            // In your case, s2 should be type of your json Java object
            return false;
        }
    }).groupBy(new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(String key, String value) {
            // your group by logic
            return null;
        }
    }).count().toStream().to("new topic");