I am learning Tumbling window concepts in kafka streams. So, based on code mentioned in https://github.com/timothyrenner/kafka-streams-ex/tree/master/tumbling-window I created my jars and am running them on my Linux VM (i.e. both Producer and Consumer code are running on my VM). I have a question here (chances are my understanding is incorrect). So, when I run kafka-console-consumer on topics longs
(on which producer is producing data) and long-counts-all
(on which final output is rendered by kafka streams code), I see that for every 6 (k,v) records produced on topic longs, 3 (k,v) records are output to long-counts-all (output topic). For example:
longs:
A -3
A 2
A 1
A -7
A -1
A 1
long-counts-all:
A 2
A 2
A 2
I have 2 questions:
1) Though my tumbling time is TimeWindows.of(1000L)
I am unable to understand the reason as to why the count of 'A' is produced on topic - long-counts-all
for every 30 seconds time frame whereas it should have been for 10 secs window.
2) Also, I am guessing that the count of key 'A' is 2 for every 10 secs and that is the reason as to why the output in topic - long-counts-all
is 'A 2'. Please correct me if any wrong.
Producer:
public class Producer {
public static void main(String[] args) {
// Now generate the data and write to the topic.
Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("key.serializer",
"org.apache.kafka.common" +
".serialization.ByteArraySerializer");
producerConfig.put("value.serializer",
"org.apache.kafka.common" +
".serialization.LongSerializer");
KafkaProducer producer =
new KafkaProducer<byte[], Long>(producerConfig);
Random rng = new Random(12345L);
try {
while (true) {
producer.send(new ProducerRecord<byte[], Long>(
"longs", "A".getBytes(), rng.nextLong() % 10));
Thread.sleep(5000L);
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
Consumer:
public class Consumer {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"tumbling-window-kafka-streams");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<byte[], Long> longs = builder.stream(
Serdes.ByteArray(), Serdes.Long(), "longs");
// The tumbling windows will clear every ten seconds.
KTable<Windowed<byte[]>, Long> longCounts =
longs.groupByKey()
.count(TimeWindows.of(10000L)
.until(10000L),
"long-counts");
// Write to topics.
longCounts.toStream((k,v) -> k.key())
.to(Serdes.ByteArray(),
Serdes.Long(),
"long-counts-all");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}