Hot key handling in Flink

26 views Asked by At

I have a stream of events for which I want to calculate how properties change over time at customer ID level. Sample event is something like {“customer_ID”: 1, “property_1”: “val_1”, “property_2”: “val_2”}. And the expected output would be something like {“customer_ID”: 1, “property_1_list”: “val_1, val_3, …”, “property_2_list”:”val_2, val_5, …”}. This output will be stored in a remote database, so my Flink app will need to retrieve the previous value from the DB first then update the value.

The tricky part is that some customer ID can have more events than others thus become a hot key and cause some Flink subtasks constantly being busy. One potential optimization I found was to do a pre-shuffle aggregation. However, hot customer IDs can change and how do I dynamically set hot keys and do more aggresive aggregation with them while leave those regular keys untouched?

Any other suggestions on dealing with hot keys? Maybe a separate Flink app to first detect which ones are hot customer, put them in a remote cache and then a dedicated Flink app to process events with hot keys?

1

There are 1 answers

0
kkrugler On

If you are doing a remote DB fetch for each customer, then use Flink's Async IO support with sufficient threading, and you shouldn't see "hot" sub-tasks because most of the time your workflow will be waiting for results from the DB.

Note that your workflow will be rate-limited by how fast your DB can handle multiple requests. This is often the bottleneck for a Flink workflow using an external DB.

You can speed things up by caching results from the DB, which for the "hot key" case should be highly effective. Just use a simple in-memory cache (e.g. Java LinkedHashMap) in your RichAsyncFunction's asyncInvoke method.

Regards,

-- Ken