Flink SQL Unit Testing: How to Assign Watermark?

966 views Asked by At

I'm writing a unit test for a Flink SQL statement that uses match_recognize. I'm setting up the test data like this

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

I have two questions,

  • How do I designate event_time as the field for watermarking? (indicating rowtime)
  • Less important, give the table created a meaningful name?

FLINK VERSION: 1.11

1

There are 1 answers

1
morsapaes On BEST ANSWER

You hit a current limitation of the Table API: it's not possible to define watermarks and rowtime attributes in combination with the forValues method; you need a connector. There are a couple of options to work around it:

1. Use a csv connector that you stack up with your VALUES, like shown in this example.

2. Use the built-in DataGen connector. Since you're putting together a unit test for CEP, I imagine that you want some degree of control over the data that is generated, so this is probably not a viable option. Thought I'd mention it, anyways.

Note: Using SQL DDL syntax is the recommended way to create tables from Flink 1.10. This would make both things you're trying to do (i.e. defining a watermark and naming your table) more straightforward:

tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

The watermark is declared as a computed column and there are multiple watermark strategies you can opt to use. Please check this documentation page for more details.