Say you're doing a Flink SQL 3-way join between tables sourced from different kafka topics;
CREATE TEMPORARY TABLE source2( col1 INT NOT NULL, col2 INT NOT NULL, ... ) with ('connector' = 'upsert-kafka', ... );
CREATE TEMPORARY TABLE source2( col1 INT NOT NULL, col2 INT NOT NULL,... ) with ('connector' = 'upsert-kafka', ... );
CREATE TEMPORARY TABLE source3( col1 INT NOT NULL, col2 INT NOT NULL,... ) with ('connector' = 'upsert-kafka', ... );
SELECT s1.col2, s2.col2, s3.col2
FROM source1 s1
INNER JOIN source2 s2 ON s2.col1 = s1.col1
INNER JOIN source3 s3 ON s3.col1 = s1.col1
What exactly will Flink keep in memory/stored in its state when performing this join?
If its UI is to be believed, there are two nodes participating in this operation: One node joining source1 with source2, and then another node joining the output of this with source3.
By that logic, I would imagine that the first node will need to store the full state of source1 + source2. However, it seems that the second node would now need to store the full state of Tuple2<source1,source2> (as produced by the first node) + source3.
Or in other words, the memory/state size flink uses would exceed sizeof(source1) + sizeof(source2) + sizeof(source3), because in addition to the source topics, node2 would need to hold on to the state of the join result of (source1, source2) as well.
If that was true, I'd imagine that deeply nested joins in Flink SQL could potentially blow up memory quite a bit & beyond the sum of the sizes of the input streams individually.
So is my assumption right here? Or does Flink do anything clever here to reduce the state it needs to hang on to?
There's some limited cleverness:
(1) Only the columns needed to produce the desired result are stored.
(2) The state can be kept in RocksDB rather than in memory.
(3) Time constraints and watermarks can be used to drop state that is no longer needed.
(4) State TTL can be used to drop older state (at the risk of producing incomplete results). See Martijn's answer for more info.