How to read rocksdb state when using flink sql to join 2 source

75 views Asked by At

I have my sql defined as

CREATE TABLE IF NOT EXISTS TABLE_1 (
    headers     VARCHAR NOT NULL,
    id          VARCHAR NOT NULL,
    `timestamp` TIMESTAMP_LTZ(3) NULL,
    type        VARCHAR NOT NULL,
    contentJson VARCHAR NOT NULL
) WITH (
    'connector' = 'kafka',
    'topic-pattern' = 'table_1__.+?',
    'properties.bootstrap.servers' = 'localhost:29092',
    'properties.group.id' = 'table_1__raw_step1_local_1',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);


CREATE TABLE IF NOT EXISTS TABLE_2 (
    headers     VARCHAR NOT NULL,
    id          VARCHAR NOT NULL,
    `timestamp` TIMESTAMP_LTZ(3) NULL,
    type        VARCHAR NOT NULL,
    contentJson VARCHAR NOT NULL
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'table_2__.+?',
'properties.bootstrap.servers' = 'localhost:29092',
'properties.group.id' = 'table_2__raw_step1_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

INSERT INTO FINAL_TABLE
select t1.headers, t2.contentJson
FROM TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id)

Here TABLE_1 t1 left join TABLE_2 t2 on (t1.id = t2.id) join is stored in state. I use streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true)); . I can see checkpoint collecting data. I have pointed the checkpoint to /tmp folder. I see _metadata and shared folder related to this jon

I want to query the state produced by this join, is there an example for this ?

1

There are 1 answers

0
David Anderson On

There isn't any tooling for doing that in a reasonable way.

If you dig through the source code for the SQL join operator, you can find the names and serializers used by the relevant state objects. And with that information you might be able to use the state processor API to read this state from a savepoint.

I can't guarantee this will work; I'm not aware that anyone has tried this with the state from SQL operators.