My Kafka topic contains statuses keyed by deviceId. I would like to use KStreamBuilder.stream().groupByKey().aggregate(...)
to only keep the latest value of a status in a TimeWindow
. I guess that, as long as the topic is partitioned by key, the aggregation function can always return the latest values in this fashion:
(key, value, older_value) -> value
Is this a guarantee I can expect from Kafka Streams? Should I roll my own processing method that checks the timestamp?
Kafka Streams guaranteed ordering by offsets but not by timestamp. Thus, by default "last update wins" policy is based on offsets but not on timestamp. Late arriving records ("late" defined on timestamps) are out-of-order based on timestamps and they will not be reordered to keep original offsets order.
If you want to have your window containing the latest value based on timestamps you will need to use Processor API (PAPI) to make this work.
Within Kafka Streams' DSL, you cannot access the record timestamp that is required to get the correct result. A easy way might be to put a
.transform()
before.groupBy()
and add the timestamp to the record (ie, its value) itself. Thus, you can use the timestamp within yourAggregator
(btw: a.reduce()
that is simpler to use might also work instead of.aggregate()
). Finally, you need to do.mapValues()
after your.aggregate()
to remove the timestamp from the value again.Using this mix-and-match approach of DSL and PAPI should simplify your code, as you can use DSL windowing support and
KTable
and do not need to do low-level time-window and state management.Of course, you can also just do all this in a single low-level stateful processor, but I would not recommend it.