I am currently working on a Kafka Streams solution for retrieving user browsing sessions using the SessionWindows
. My topology looks like:
builder
.stream(...)
.map(... => (newKey, value))
.groupByKey(...)
.windowedBy(SessionWindows.`with`(INACTIVITY_GAP).grace(GRACE))
.aggregate(... into list of events)
.suppress(Suppressed.untilWindowCloses(unbounded()))
This simple scenario works well for me, however I need to put some additional checks to the suppression logic. Namely I would like to force-flush all the sessions that exceed given size (e.g. all the sessions that have more than 1000 events inside, the events were produced within inactivity gap). My question is how this could be implemented?
I know that the .suppress()
method does not accept any custom implementation of Suppressed
. Therefore I was thinking about replacing the .suppress()
with .transform()
with my custom Transformer
with a SessionStore
inside that could do the suppression logic and also apply these additional checks. However, I am having a hard time when it comes to adding/deleting entries to the store and implementing the basic "untilWindowClosed" suppression by myself: I could probably do the periodic flush through ProcessorContext.schedule()
but the SessionStore
does not provide the possibility to iterate through all keys.
Is this a good direction? Are there any other ways to add size constraints to the sessions?