I have my sql defined as
CREATE TABLE IF NOT EXISTS TABLE_1 (
headers VARCHAR NOT NULL,
id VARCHAR NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NULL,
type VARCHAR NOT NULL,
contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_1__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_1__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE IF NOT EXISTS TABLE_2 (
headers VARCHAR NOT NULL,
id VARCHAR NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NULL,
type VARCHAR NOT NULL,
contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_2__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_2__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
INSERT INTO FINAL_TABLE
select t1.headers, t2.contentJson
FROM TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)
Here TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)
join is stored in state. I use streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true));
. I can see checkpoint collecting data. I have pointed the checkpoint to /tmp
folder. I see _metadata and shared
folder related to this jon
I want to query the state produced by this join, is there an example for this ?
There isn't any tooling for doing that in a reasonable way.
If you dig through the source code for the SQL join operator, you can find the names and serializers used by the relevant state objects. And with that information you might be able to use the state processor API to read this state from a savepoint.
I can't guarantee this will work; I'm not aware that anyone has tried this with the state from SQL operators.