Tumbling window concept kafka streams

3.2k views Asked by At

I am learning Tumbling window concepts in kafka streams. So, based on code mentioned in https://github.com/timothyrenner/kafka-streams-ex/tree/master/tumbling-window I created my jars and am running them on my Linux VM (i.e. both Producer and Consumer code are running on my VM). I have a question here (chances are my understanding is incorrect). So, when I run kafka-console-consumer on topics longs (on which producer is producing data) and long-counts-all (on which final output is rendered by kafka streams code), I see that for every 6 (k,v) records produced on topic longs, 3 (k,v) records are output to long-counts-all (output topic). For example:

longs:

A       -3
A       2
A       1
A       -7
A       -1

A       1

long-counts-all:

A       2
A       2
A       2

I have 2 questions:

1) Though my tumbling time is TimeWindows.of(1000L) I am unable to understand the reason as to why the count of 'A' is produced on topic - long-counts-all for every 30 seconds time frame whereas it should have been for 10 secs window.

2) Also, I am guessing that the count of key 'A' is 2 for every 10 secs and that is the reason as to why the output in topic - long-counts-all is 'A 2'. Please correct me if any wrong.

Producer:

public class Producer {
    public static void main(String[] args) {
        // Now generate the data and write to the topic.
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", "localhost:9092");
        producerConfig.put("key.serializer",
                "org.apache.kafka.common" +
                        ".serialization.ByteArraySerializer");
        producerConfig.put("value.serializer",
                "org.apache.kafka.common" +
                        ".serialization.LongSerializer");

        KafkaProducer producer =
                new KafkaProducer<byte[], Long>(producerConfig);

        Random rng = new Random(12345L);
        try {
            while (true) {
                producer.send(new ProducerRecord<byte[], Long>(
                        "longs", "A".getBytes(), rng.nextLong() % 10));
                Thread.sleep(5000L);
            }
        } catch(Exception e)   {
            e.printStackTrace();
        }
    }
}

Consumer:

public class Consumer {
    public static void main(String[] args) {
        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "tumbling-window-kafka-streams");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                Serdes.ByteArray().getClass().getName());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                Serdes.Long().getClass().getName());

        KStreamBuilder builder = new KStreamBuilder();

        KStream<byte[], Long> longs = builder.stream(
                Serdes.ByteArray(), Serdes.Long(), "longs");

        // The tumbling windows will clear every ten seconds.
        KTable<Windowed<byte[]>, Long> longCounts =
                longs.groupByKey()
                        .count(TimeWindows.of(10000L)
                                        .until(10000L),
                                "long-counts");

        // Write to topics.
        longCounts.toStream((k,v) -> k.key())
                .to(Serdes.ByteArray(),
                        Serdes.Long(),
                        "long-counts-all");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}   
0

There are 0 answers