How to manage joining metadata against an event in Flink with large, rarely changing metadata

80 views Asked by At

We are looking at using flink for a rebuild of one of our systems, and are trying to think through some of the cases. As a simple example, let's say we have entities and entity activity. Whenever an activity event comes in, it just has a entity id and we want to join the activity with the entity before passing it on to more processing. The number of possible entities is very large (, the metadata about a user doesn't change often, but can change.

We see this as having a few ways of tackling it, the first being joining streams (a user update stream and the activity stream), but since the last update to a user could be a long time ago (possibly years) we can't really use this approach.

Another approach would be to use a stateful operation that stores the user metadata in it's state. When an activity comes in, we can lookup the user in the state and attach the metadata. The issue with this is the user metadata state can be come incredibly large in our case (in the billions). Keeping all entities in a state store for all of time seems less than ideal, although maybe is the best way forward?

Another approach would be to store our state in a DB and query it, but then we either have to only query the primary to ensure we have the most updated data or query a secondary and have to worry about replication delay and possibly missing an update.

The last approach I could think of is doing a hybrid where we use a stateful store with a ttl to keep it from growing too large and also push any updates that were done to the store to a db as well. We'll always try to pull from the state first, and if it doesn't exist we fall back to the DB. This means that any time the entity is updated it will be in the store for the ttl (guaranteeing we have the most up-to-date data). We could also push things to the store any time we had to fallback on the database (if an entity does something, they are likely to do more things so keep their info in the state).

My main concern with either of the approaches that use the db is related to checkpointing. If Flink needs to restore a checkpoint for it's state, I don't see how the db can also take that into account easily. It will have the state at 'head' while the stateful function will have state at the checkpoint. And let's say we had a bug in our code and need to restore a checkpoint/savepoint from a week ago. That's possibly a large difference between the stateful function and the db.

I'm wondering if there is some common pattern for handling cases like this

1

There are 1 answers

0
David Anderson On BEST ANSWER

You've outlined 4 approaches:

(1) join the user data and user activity streams
(2) store the user data in flink state
(3) lookup the user data from an external DB
(4) hybrid, using Flink state as a cache

I don't see any real difference between (1) and (2). A Flink SQL temporal join will behave just as described, and sounds to me like it should be tractable.

(3) sounds like a Flink SQL lookup join.

As for caching, Flink SQL lookup joins can use a cache, but you may not care for its semantics. The cache is in memory, and not in Flink state. Cached data is used whenever it is present. The cache has a TTL, but no effort is made to update the cache when the underlying DB is updated.