Do FlinkSql join nodes individually maintain full state of both inputs?

74 views Asked by At

Say you're doing a Flink SQL 3-way join between tables sourced from different kafka topics;

CREATE TEMPORARY TABLE source2( col1 INT NOT NULL, col2 INT NOT NULL, ... ) with ('connector' = 'upsert-kafka', ... );
CREATE TEMPORARY TABLE source2( col1 INT NOT NULL, col2 INT NOT NULL,... ) with ('connector' = 'upsert-kafka', ... );
CREATE TEMPORARY TABLE source3( col1 INT NOT NULL, col2 INT NOT NULL,... ) with ('connector' = 'upsert-kafka', ... );

SELECT     s1.col2, s2.col2, s3.col2
FROM       source1 s1
INNER JOIN source2 s2 ON s2.col1 = s1.col1
INNER JOIN source3 s3 ON s3.col1 = s1.col1

What exactly will Flink keep in memory/stored in its state when performing this join?

If its UI is to be believed, there are two nodes participating in this operation: One node joining source1 with source2, and then another node joining the output of this with source3.

By that logic, I would imagine that the first node will need to store the full state of source1 + source2. However, it seems that the second node would now need to store the full state of Tuple2<source1,source2> (as produced by the first node) + source3.

Or in other words, the memory/state size flink uses would exceed sizeof(source1) + sizeof(source2) + sizeof(source3), because in addition to the source topics, node2 would need to hold on to the state of the join result of (source1, source2) as well.

If that was true, I'd imagine that deeply nested joins in Flink SQL could potentially blow up memory quite a bit & beyond the sum of the sizes of the input streams individually.

So is my assumption right here? Or does Flink do anything clever here to reduce the state it needs to hang on to?


There are 2 answers

David Anderson On BEST ANSWER

There's some limited cleverness:

(1) Only the columns needed to produce the desired result are stored.
(2) The state can be kept in RocksDB rather than in memory.
(3) Time constraints and watermarks can be used to drop state that is no longer needed.
(4) State TTL can be used to drop older state (at the risk of producing incomplete results). See Martijn's answer for more info.

Martijn Visser On

From the Flink documentation on joins

For streaming queries, the grammar of regular joins is the most flexible and allow for any kind of updating (insert, update, delete) input table. However, this operation has important operational implications: it requires to keep both sides of the join input in Flink state forever. Thus, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result.