Flink Table print connector not being called

408 views Asked by At

I am using the Flink table API to pull data from a kinesis topic into a table. I want to periodically pull that data into a temporary table and run a custom scalar function on it. However, I notice that my scalar function is not being called at all.

Here is the code for the Kinesis table :

    this.tableEnv.executeSql("CREATE TABLE transactions (\n" +
        "    entry  STRING,\n" +
        "    sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,\n" +
        "    shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,\n" +
        "    arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,\n" +
        "    WATERMARK FOR arrival_time AS arrival_time - INTERVAL '5' SECOND\n" +
        ") WITH (\n" +
        "    'connector' = 'kinesis',\n" +
        "    'stream'     = '" + streamName + "',\n" +
        "    'aws.region' = 'us-west-2', \n" +
        "    'format'    = 'raw'\n" +
        ")");

Then, I want to periodically call a tumble every second which pulls data from kinesis and updates a temporary table.

My temporary table is defined like this:

    this.tableEnv.executeSql("CREATE TABLE temporaryTable (\n" +
        "    entry STRING,\n" +
        "    sequence_number VARCHAR(128) NOT NULL,\n" +
        "    shard_id VARCHAR(128) NOT NULL,\n" +
        "    arrival_time     TIMESTAMP(3),\n" +
        "    record_list STRING NOT NULL,\n" +
        "    PRIMARY KEY (shard_id, sequence_number) NOT ENFORCED" +
        ") WITH (\n" +
        "   'connector'  = 'print'\n" +
        ")");

I then have a code to do the tumbling :

Table inMemoryTable = transactions.
        window(Tumble.over(lit(1).second()).on($("arrival_time")).as("log_ts")) 
        .groupBy($("entry"), $("sequence_number"), $("log_ts"), $("shard_id"), $("arrival_time"))
        .select(
            $("entry"),
            $("sequence_number"), $("shard_id"), $("arrival_time"),
            (call(CustomFunction.class, $("entry")).as("record_list")));
inMemoryTable.executeInsert(temporaryTable)

The CustomFunction class looks like this :

public class CustomFunction extends ScalarFunction {
  @DataTypeHint("STRING")
  public String eval(
      @DataTypeHint("STRING") String serializedEntry) throws IOException {
    return "asd";
}

When I run this code in Flink, I dont get anything in the stdout so obviously I am missing something.

Here is the Flink UI:

Image as link as I dont have enough rep

Thanks for any help.

1

There are 1 answers

2
Gururaj Kosuru On

I am able to get the stream to print with:

driver.tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle", "10000 ms");
driver.env.getConfig().setAutoWatermarkInterval(5000);