I am using the Flink table API to pull data from a kinesis topic into a table. I want to periodically pull that data into a temporary table and run a custom scalar function on it. However, I notice that my scalar function is not being called at all.
Here is the code for the Kinesis table :
this.tableEnv.executeSql("CREATE TABLE transactions (\n" +
" entry STRING,\n" +
" sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,\n" +
" shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,\n" +
" arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,\n" +
" WATERMARK FOR arrival_time AS arrival_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kinesis',\n" +
" 'stream' = '" + streamName + "',\n" +
" 'aws.region' = 'us-west-2', \n" +
" 'format' = 'raw'\n" +
")");
Then, I want to periodically call a tumble every second which pulls data from kinesis and updates a temporary table.
My temporary table is defined like this:
this.tableEnv.executeSql("CREATE TABLE temporaryTable (\n" +
" entry STRING,\n" +
" sequence_number VARCHAR(128) NOT NULL,\n" +
" shard_id VARCHAR(128) NOT NULL,\n" +
" arrival_time TIMESTAMP(3),\n" +
" record_list STRING NOT NULL,\n" +
" PRIMARY KEY (shard_id, sequence_number) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
I then have a code to do the tumbling :
Table inMemoryTable = transactions.
window(Tumble.over(lit(1).second()).on($("arrival_time")).as("log_ts"))
.groupBy($("entry"), $("sequence_number"), $("log_ts"), $("shard_id"), $("arrival_time"))
.select(
$("entry"),
$("sequence_number"), $("shard_id"), $("arrival_time"),
(call(CustomFunction.class, $("entry")).as("record_list")));
inMemoryTable.executeInsert(temporaryTable)
The CustomFunction class looks like this :
public class CustomFunction extends ScalarFunction {
@DataTypeHint("STRING")
public String eval(
@DataTypeHint("STRING") String serializedEntry) throws IOException {
return "asd";
}
When I run this code in Flink, I dont get anything in the stdout so obviously I am missing something.
Here is the Flink UI:
Image as link as I dont have enough rep
Thanks for any help.
I am able to get the stream to print with: