Keyby data distribution in Apache Flink, Logical or Physical Operator?

2.2k views Asked by At

According to the Apache Flink documentation, KeyBy transformation logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition.

Is KeyBy 100% logical transformation? Doesn't it include physical data partitioning for distribution across the cluster nodes? If so, then how it can guarantee that all the records with the same key are assigned to the same partition?

For instance, assuming that we are getting a distributed data stream from Apache Kafka cluster of n nodes. Apache Flink cluster running our streaming job consists of m nodes. When the keyBy transformation is applied on the incoming data stream, how does it guarantees logical data partitioning? Or does it involve physical data partitioning across the cluster nodes?

It seems I am confused between logical and physical data partitioning.

1

There are 1 answers

5
David Anderson On BEST ANSWER

The keyspace of all possible keys is divided into some number of key groups. The number of key groups (which is the same as the maximum parallelism) is a configuration parameter you can set when setting up a Flink cluster; the default value is 128.

Each key belongs to exactly one key group. When a cluster is launched, the key groups are divided among the task managers -- and if the cluster is started from a checkpoint or savepoint, those snapshots are indexed by key group, and each task manager loads the state for the keys in the key groups it has been assigned.

While a job is running, every task manager knows the key selector functions used to compute the keys, and how keys map onto key groups. The TMs also know the partitioning of key groups to task managers. This makes it straightforward to route each message to the task manager responsible for that message's key.

Details:

The key group that a key belongs to is computed roughly like this:

Object key = the result of your KeySelector function;
int keyHash = key.hashCode();
int keyGroupId = MathUtils.murmurHash(keyHash) % maxParallelism;

The index of the operator instance to which elements from a given key group should be routed given the actual parallelism and maxParallelism is computed as

keyGroupId * parallelism / maxParallelism

The actual code is in org.apache.flink.runtime.state.KeyGroupRangeAssignment if you want to take a look.

One major takeaway is that the key groups are disjoint, and they span the keyspace. In other words, it's not possible for a key to come along that doesn't belong to one of the key groups. Every key belongs to exactly one of the key groups, and every key group belongs to one of the task managers.