Flink Idle State Retention based on event time

529 views Asked by At

This might be a simple question to answer, but I couldn't find it stated explicitly in the docs: is Flink's idle state retention computed based on event time when using StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime), or is it always based on processing time?

I have a very similar situation to what the Idle State Retention Time documentation describes, so I'm just going to use that as the example. I'm computing clicks per session over time

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

and expect sessions to eventually become inactive, which is where I'd like the idle state retention policy to kick in. The only difference from the example is that on job startup, I've written a custom source function that initially reads historic events from S3 (processing the last N days of data) before switching over to Kafka for new incoming events. Let's say I set idle state retention to 72 hours, and process the past month's worth of data from S3 - eventually I'd expect the state's size to stabilize with ~3 days worth of data while working its way through the data from S3 as inactive sessions get removed. In reality the state continues to grow the entire time it's processing the last month's worth of data.

Unfortunately the time windows I'm actually working with are much longer (currently have idle state retention set to 20 days) so I haven't yet had a chance to see if the state will shrink when it hits that point in processing time. It's also possible I'm doing something wrong in my source function that's preventing the idle state retention cleanup from working, so any help would be appreciated.

1

There are 1 answers

1
David Anderson On BEST ANSWER

The idle state retention time for Flink SQL is based on processing time.